spark git commit: [SPARK-8644] Include call site in SparkException stack traces thrown by job failures
Repository: spark Updated Branches: refs/heads/master 031d7d414 - 57e9b13bf [SPARK-8644] Include call site in SparkException stack traces thrown by job failures Example exception (new part at bottom, clearly demarcated): ``` org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.RuntimeException: uh-oh! at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:880) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38$$anonfun$apply$mcJ$sp$2.apply(DAGSchedulerSuite.scala:880) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1640) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1777) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1777) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1298) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1289) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1288) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1288) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:755) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:755) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:755) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1509) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1470) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1459) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:560) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1744) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1762) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1777) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1791) at org.apache.spark.rdd.RDD.count(RDD.scala:1099) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply$mcJ$sp(DAGSchedulerSuite.scala:880) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply(DAGSchedulerSuite.scala:880) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37$$anonfun$38.apply(DAGSchedulerSuite.scala:880) at org.scalatest.Assertions$class.intercept(Assertions.scala:997) at org.scalatest.FunSuite.intercept(FunSuite.scala:1555) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply$mcV$sp(DAGSchedulerSuite.scala:879) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply(DAGSchedulerSuite.scala:878) at org.apache.spark.scheduler.DAGSchedulerSuite$$anonfun$37.apply(DAGSchedulerSuite.scala:878) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:42) at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163) at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175) at
spark git commit: [SPARK-7183] [NETWORK] Fix memory leak of TransportRequestHandler.streamIds
Repository: spark Updated Branches: refs/heads/master 1262e310c - 168603272 [SPARK-7183] [NETWORK] Fix memory leak of TransportRequestHandler.streamIds JIRA: https://issues.apache.org/jira/browse/SPARK-7183 Author: Liang-Chi Hsieh vii...@gmail.com Closes #5743 from viirya/fix_requesthandler_memory_leak and squashes the following commits: cf2c086 [Liang-Chi Hsieh] For comments. 97e205c [Liang-Chi Hsieh] Remove unused import. d35f19a [Liang-Chi Hsieh] For comments. f9a0c37 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into fix_requesthandler_memory_leak 45908b7 [Liang-Chi Hsieh] for style. 17f020f [Liang-Chi Hsieh] Remove unused import. 37a4b6c [Liang-Chi Hsieh] Remove streamIds from TransportRequestHandler. 3b3f38a [Liang-Chi Hsieh] Fix memory leak of TransportRequestHandler.streamIds. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16860327 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16860327 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16860327 Branch: refs/heads/master Commit: 16860327286bc08b4e2283d51b4c8fe024ba5006 Parents: 1262e31 Author: Liang-Chi Hsieh vii...@gmail.com Authored: Fri May 1 11:59:12 2015 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri May 1 11:59:12 2015 -0700 -- .../network/server/OneForOneStreamManager.java | 35 +++- .../spark/network/server/StreamManager.java | 19 --- .../network/server/TransportRequestHandler.java | 14 ++-- 3 files changed, 44 insertions(+), 24 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16860327/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java index a6d390e..c95e64e 100644 --- a/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java +++ b/network/common/src/main/java/org/apache/spark/network/server/OneForOneStreamManager.java @@ -20,14 +20,18 @@ package org.apache.spark.network.server; import java.util.Iterator; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; +import io.netty.channel.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.spark.network.buffer.ManagedBuffer; +import com.google.common.base.Preconditions; + /** * StreamManager which allows registration of an Iteratorlt;ManagedBuffergt;, which are individually * fetched as chunks by the client. Each registered buffer is one chunk. @@ -36,18 +40,21 @@ public class OneForOneStreamManager extends StreamManager { private final Logger logger = LoggerFactory.getLogger(OneForOneStreamManager.class); private final AtomicLong nextStreamId; - private final MapLong, StreamState streams; + private final ConcurrentHashMapLong, StreamState streams; /** State of a single stream. */ private static class StreamState { final IteratorManagedBuffer buffers; +// The channel associated to the stream +Channel associatedChannel = null; + // Used to keep track of the index of the buffer that the user has retrieved, just to ensure // that the caller only requests each chunk one at a time, in order. int curChunk = 0; StreamState(IteratorManagedBuffer buffers) { - this.buffers = buffers; + this.buffers = Preconditions.checkNotNull(buffers); } } @@ -59,6 +66,13 @@ public class OneForOneStreamManager extends StreamManager { } @Override + public void registerChannel(Channel channel, long streamId) { +if (streams.containsKey(streamId)) { + streams.get(streamId).associatedChannel = channel; +} + } + + @Override public ManagedBuffer getChunk(long streamId, int chunkIndex) { StreamState state = streams.get(streamId); if (chunkIndex != state.curChunk) { @@ -80,12 +94,17 @@ public class OneForOneStreamManager extends StreamManager { } @Override - public void connectionTerminated(long streamId) { -// Release all remaining buffers. -StreamState state = streams.remove(streamId); -if (state != null state.buffers != null) { - while (state.buffers.hasNext()) { -state.buffers.next().release(); + public void connectionTerminated(Channel channel) { +// Close all streams which have been associated with the channel. +for (Map.EntryLong, StreamState entry: streams.entrySet()) { + StreamState state = entry.getValue(); + if (state.associatedChannel
spark git commit: [SPARK-7003] Improve reliability of connection failure detection between Netty block transfer service endpoints
Repository: spark Updated Branches: refs/heads/master 1be207078 - 968ad9721 [SPARK-7003] Improve reliability of connection failure detection between Netty block transfer service endpoints Currently we rely on the assumption that an exception will be raised and the channel closed if two endpoints cannot communicate over a Netty TCP channel. However, this guarantee does not hold in all network environments, and [SPARK-6962](https://issues.apache.org/jira/browse/SPARK-6962) seems to point to a case where only the server side of the connection detected a fault. This patch improves robustness of fetch/rpc requests by having an explicit timeout in the transport layer which closes the connection if there is a period of inactivity while there are outstanding requests. NB: This patch is actually only around 50 lines added if you exclude the testing-related code. Author: Aaron Davidson aa...@databricks.com Closes #5584 from aarondav/timeout and squashes the following commits: 8699680 [Aaron Davidson] Address Reynold's comments 37ce656 [Aaron Davidson] [SPARK-7003] Improve reliability of connection failure detection between Netty block transfer service endpoints Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/968ad972 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/968ad972 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/968ad972 Branch: refs/heads/master Commit: 968ad972175390bb0a96918fd3c7b318d70fa466 Parents: 1be2070 Author: Aaron Davidson aa...@databricks.com Authored: Mon Apr 20 09:54:21 2015 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Apr 20 09:54:21 2015 -0700 -- .../apache/spark/network/TransportContext.java | 5 +- .../client/TransportResponseHandler.java| 14 +- .../network/server/TransportChannelHandler.java | 33 ++- .../spark/network/util/MapConfigProvider.java | 41 +++ .../apache/spark/network/util/NettyUtils.java | 2 +- .../network/RequestTimeoutIntegrationSuite.java | 277 +++ .../network/TransportClientFactorySuite.java| 21 +- 7 files changed, 375 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/968ad972/network/common/src/main/java/org/apache/spark/network/TransportContext.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java index f0a89c9..3fe69b1 100644 --- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java +++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java @@ -22,6 +22,7 @@ import java.util.List; import com.google.common.collect.Lists; import io.netty.channel.Channel; import io.netty.channel.socket.SocketChannel; +import io.netty.handler.timeout.IdleStateHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +107,7 @@ public class TransportContext { .addLast(encoder, encoder) .addLast(frameDecoder, NettyUtils.createFrameDecoder()) .addLast(decoder, decoder) +.addLast(idleStateHandler, new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000)) // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this // would require more logic to guarantee if this were not part of the same event loop. .addLast(handler, channelHandler); @@ -126,7 +128,8 @@ public class TransportContext { TransportClient client = new TransportClient(channel, responseHandler); TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client, rpcHandler); -return new TransportChannelHandler(client, responseHandler, requestHandler); +return new TransportChannelHandler(client, responseHandler, requestHandler, + conf.connectionTimeoutMs()); } public TransportConf getConf() { return conf; } http://git-wip-us.apache.org/repos/asf/spark/blob/968ad972/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java index 2044afb..94fc21a 100644 --- a/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java +++ b/network/common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java @@ -20,8 +20,8 @@ package org.apache.spark.network.client; import java.io.IOException; import java.util.Map; import
spark git commit: [Minor] [SQL] [SPARK-6729] Minor fix for DriverQuirks get
Repository: spark Updated Branches: refs/heads/master 30363ede8 - e40ea8742 [Minor] [SQL] [SPARK-6729] Minor fix for DriverQuirks get The function uses .substring(0, X), which will trigger OutOfBoundsException if string length is less than X. A better way to do this is to use startsWith, which won't error out in this case. Author: Volodymyr Lyubinets vlyu...@gmail.com Closes #5378 from vlyubin/quirks and squashes the following commits: 504e8e0 [Volodymyr Lyubinets] Minor fix for DriverQuirks get Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e40ea874 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e40ea874 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e40ea874 Branch: refs/heads/master Commit: e40ea8742a8771ecd46b182f45b5fcd8bd6dd725 Parents: 30363ed Author: Volodymyr Lyubinets vlyu...@gmail.com Authored: Mon Apr 6 18:00:51 2015 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Apr 6 18:00:51 2015 -0700 -- .../src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e40ea874/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala index 1704be7..0feabc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DriverQuirks.scala @@ -49,9 +49,9 @@ private[sql] object DriverQuirks { * Fetch the DriverQuirks class corresponding to a given database url. */ def get(url: String): DriverQuirks = { -if (url.substring(0, 10).equals(jdbc:mysql)) { +if (url.startsWith(jdbc:mysql)) { new MySQLQuirks() -} else if (url.substring(0, 15).equals(jdbc:postgresql)) { +} else if (url.startsWith(jdbc:postgresql)) { new PostgresQuirks() } else { new NoQuirks() - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6122][Core] Upgrade Tachyon client version to 0.6.1.
Repository: spark Updated Branches: refs/heads/master 6ef48632f - a41b9c600 [SPARK-6122][Core] Upgrade Tachyon client version to 0.6.1. Changes the Tachyon client version from 0.5 to 0.6 in spark core and distribution script. New dependencies in Tachyon 0.6.0 include commons-codec:commons-codec:jar:1.5:compile io.netty:netty-all:jar:4.0.23.Final:compile These are already in spark core. Author: Calvin Jia jia.cal...@gmail.com Closes #4867 from calvinjia/upgrade_tachyon_0.6.0 and squashes the following commits: eed9230 [Calvin Jia] Update tachyon version to 0.6.1. 11907b3 [Calvin Jia] Use TachyonURI for tachyon paths instead of strings. 71bf441 [Calvin Jia] Upgrade Tachyon client version to 0.6.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a41b9c60 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a41b9c60 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a41b9c60 Branch: refs/heads/master Commit: a41b9c6004cfee84bd56dfa1faf5a0cf084551ae Parents: 6ef4863 Author: Calvin Jia jia.cal...@gmail.com Authored: Sun Mar 22 11:11:29 2015 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Mar 22 11:11:29 2015 -0700 -- core/pom.xml| 2 +- .../spark/storage/TachyonBlockManager.scala | 27 ++-- .../scala/org/apache/spark/util/Utils.scala | 4 ++- make-distribution.sh| 2 +- 4 files changed, 18 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/a41b9c60/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 6cd1965..868834d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -275,7 +275,7 @@ dependency groupIdorg.tachyonproject/groupId artifactIdtachyon-client/artifactId - version0.5.0/version + version0.6.1/version exclusions exclusion groupIdorg.apache.hadoop/groupId http://git-wip-us.apache.org/repos/asf/spark/blob/a41b9c60/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index af87303..2ab6a8f 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -20,8 +20,8 @@ package org.apache.spark.storage import java.text.SimpleDateFormat import java.util.{Date, Random} -import tachyon.client.TachyonFS -import tachyon.client.TachyonFile +import tachyon.TachyonURI +import tachyon.client.{TachyonFile, TachyonFS} import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager( val master: String) extends Logging { - val client = if (master != null master != ) TachyonFS.get(master) else null + val client = if (master != null master != ) TachyonFS.get(new TachyonURI(master)) else null if (client == null) { logError(Failed to connect to the Tachyon as the master address is not configured) @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager( addShutdownHook() def removeFile(file: TachyonFile): Boolean = { -client.delete(file.getPath(), false) +client.delete(new TachyonURI(file.getPath()), false) } def fileExists(file: TachyonFile): Boolean = { -client.exist(file.getPath()) +client.exist(new TachyonURI(file.getPath())) } def getFile(filename: String): TachyonFile = { @@ -81,7 +81,7 @@ private[spark] class TachyonBlockManager( if (old != null) { old } else { - val path = tachyonDirs(dirId) + / + %02x.format(subDirId) + val path = new TachyonURI(s${tachyonDirs(dirId)}/${%02x.format(subDirId)}) client.mkdir(path) val newDir = client.getFile(path) subDirs(dirId)(subDirId) = newDir @@ -89,7 +89,7 @@ private[spark] class TachyonBlockManager( } } } -val filePath = subDir + / + filename +val filePath = new TachyonURI(s$subDir/$filename) if(!client.exist(filePath)) { client.createFile(filePath) } @@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager( // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. private def createTachyonDirs(): Array[TachyonFile] = { -logDebug(Creating tachyon directories at root dirs ' + rootDirs + ') +logDebug(sCreating tachyon directories at root dirs '$rootDirs') val dateFormat = new
spark git commit: [SPARK-4012] stop SparkContext when the exception is thrown from an infinite loop
Repository: spark Updated Branches: refs/heads/master 645cf3fcc - 2c3f83c34 [SPARK-4012] stop SparkContext when the exception is thrown from an infinite loop https://issues.apache.org/jira/browse/SPARK-4012 This patch is a resubmission for https://github.com/apache/spark/pull/2864 What I am proposing in this patch is that ***when the exception is thrown from an infinite loop, we should stop the SparkContext, instead of let JVM throws exception forever*** So, in the infinite loops where we originally wrapped with a ` logUncaughtExceptions`, I changed to `tryOrStopSparkContext`, so that the Spark component is stopped Early stopped JVM process is helpful for HA scheme design, for example, The user has a script checking the existence of the pid of the Spark Streaming driver for monitoring the availability; with the code before this patch, the JVM process is still available but not functional when the exceptions are thrown andrewor14, srowen , mind taking further consideration about the change? Author: CodingCat zhunans...@gmail.com Closes #5004 from CodingCat/SPARK-4012-1 and squashes the following commits: 589276a [CodingCat] throw fatal error again 3c72cd8 [CodingCat] address the comments 6087864 [CodingCat] revise comments 6ad3eb0 [CodingCat] stop SparkContext instead of quit the JVM process 6322959 [CodingCat] exit JVM process when the exception is thrown from an infinite loop Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c3f83c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c3f83c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c3f83c3 Branch: refs/heads/master Commit: 2c3f83c34bb8d2c1bf13b33633d8c5a8089545d1 Parents: 645cf3f Author: CodingCat zhunans...@gmail.com Authored: Wed Mar 18 23:48:45 2015 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Mar 18 23:48:45 2015 -0700 -- .../scala/org/apache/spark/ContextCleaner.scala | 2 +- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../deploy/history/FsHistoryProvider.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../spark/util/AsynchronousListenerBus.scala| 10 +-- .../scala/org/apache/spark/util/Utils.scala | 28 .../scheduler/EventLoggingListenerSuite.scala | 9 --- .../spark/scheduler/SparkListenerSuite.scala| 10 +++ .../streaming/scheduler/JobScheduler.scala | 2 +- 9 files changed, 51 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/ContextCleaner.scala -- diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index 0c59a61..9b05c96 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { } /** Keep cleaning RDD, shuffle, and broadcast state. */ - private def keepCleaning(): Unit = Utils.logUncaughtExceptions { + private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { while (!stopped) { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4457f40..228ff71 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } -listenerBus.start() +listenerBus.start(this) } /** Post the application start event */ http://git-wip-us.apache.org/repos/asf/spark/blob/2c3f83c3/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 16d88c1..7fde020 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis */ private def
spark git commit: [SPARK-6330] Fix filesystem bug in newParquet relation
Repository: spark Updated Branches: refs/heads/master 12a345adc - d19efeddc [SPARK-6330] Fix filesystem bug in newParquet relation If I'm running this locally and my path points to S3, this would currently error out because of incorrect FS. I tested this in a scenario that previously didn't work, this change seemed to fix the issue. Author: Volodymyr Lyubinets vlyu...@gmail.com Closes #5020 from vlyubin/parquertbug and squashes the following commits: a645ad5 [Volodymyr Lyubinets] Fix filesystem bug in newParquet relation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d19efedd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d19efedd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d19efedd Branch: refs/heads/master Commit: d19efeddc0cb710c9496af11e447d39e1ad61b31 Parents: 12a345a Author: Volodymyr Lyubinets vlyu...@gmail.com Authored: Mon Mar 16 12:13:18 2015 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Mar 16 12:13:18 2015 -0700 -- .../main/scala/org/apache/spark/sql/parquet/newParquet.scala| 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d19efedd/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 234e6bb..c38b6e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet import java.io.IOException import java.lang.{Double = JDouble, Float = JFloat, Long = JLong} import java.math.{BigDecimal = JBigDecimal} +import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, List = JList} @@ -244,11 +245,10 @@ private[sql] case class ParquetRelation2( * Refreshes `FileStatus`es, footers, partition spec, and table schema. */ def refresh(): Unit = { - val fs = FileSystem.get(sparkContext.hadoopConfiguration) - // Support either reading a collection of raw Parquet part-files, or a collection of folders // containing Parquet files (e.g. partitioned Parquet table). val baseStatuses = paths.distinct.map { p = +val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration) val qualified = fs.makeQualified(new Path(p)) if (!fs.exists(qualified) maybeSchema.isDefined) { @@ -262,6 +262,7 @@ private[sql] case class ParquetRelation2( // Lists `FileStatus`es of all leaf nodes (files) under all base directories. val leaves = baseStatuses.flatMap { f = +val fs = FileSystem.get(f.getPath.toUri, sparkContext.hadoopConfiguration) SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f = isSummaryFile(f.getPath) || !(f.getPath.getName.startsWith(_) || f.getPath.getName.startsWith(.)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-6330] Fix filesystem bug in newParquet relation
Repository: spark Updated Branches: refs/heads/branch-1.3 684ff2476 - 67fa6d1f8 [SPARK-6330] Fix filesystem bug in newParquet relation If I'm running this locally and my path points to S3, this would currently error out because of incorrect FS. I tested this in a scenario that previously didn't work, this change seemed to fix the issue. Author: Volodymyr Lyubinets vlyu...@gmail.com Closes #5020 from vlyubin/parquertbug and squashes the following commits: a645ad5 [Volodymyr Lyubinets] Fix filesystem bug in newParquet relation Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67fa6d1f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67fa6d1f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67fa6d1f Branch: refs/heads/branch-1.3 Commit: 67fa6d1f830dee37244b5a30684d797093c7c134 Parents: 684ff24 Author: Volodymyr Lyubinets vlyu...@gmail.com Authored: Mon Mar 16 12:13:18 2015 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Mar 16 12:14:41 2015 -0700 -- .../main/scala/org/apache/spark/sql/parquet/newParquet.scala| 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67fa6d1f/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 234e6bb..c38b6e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.parquet import java.io.IOException import java.lang.{Double = JDouble, Float = JFloat, Long = JLong} import java.math.{BigDecimal = JBigDecimal} +import java.net.URI import java.text.SimpleDateFormat import java.util.{Date, List = JList} @@ -244,11 +245,10 @@ private[sql] case class ParquetRelation2( * Refreshes `FileStatus`es, footers, partition spec, and table schema. */ def refresh(): Unit = { - val fs = FileSystem.get(sparkContext.hadoopConfiguration) - // Support either reading a collection of raw Parquet part-files, or a collection of folders // containing Parquet files (e.g. partitioned Parquet table). val baseStatuses = paths.distinct.map { p = +val fs = FileSystem.get(URI.create(p), sparkContext.hadoopConfiguration) val qualified = fs.makeQualified(new Path(p)) if (!fs.exists(qualified) maybeSchema.isDefined) { @@ -262,6 +262,7 @@ private[sql] case class ParquetRelation2( // Lists `FileStatus`es of all leaf nodes (files) under all base directories. val leaves = baseStatuses.flatMap { f = +val fs = FileSystem.get(f.getPath.toUri, sparkContext.hadoopConfiguration) SparkHadoopUtil.get.listLeafStatuses(fs, f.getPath).filter { f = isSummaryFile(f.getPath) || !(f.getPath.getName.startsWith(_) || f.getPath.getName.startsWith(.)) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-5073] spark.storage.memoryMapThreshold have two default value
Repository: spark Updated Branches: refs/heads/master 331326090 - 1656aae2b [SPARK-5073] spark.storage.memoryMapThreshold have two default value Because major OS page sizes is about 4KB, the default value of spark.storage.memoryMapThreshold is integrated to 2 * 4096 Author: lewuathe lewua...@me.com Closes #3900 from Lewuathe/integrate-memoryMapThreshold and squashes the following commits: e417acd [lewuathe] [SPARK-5073] Update docs/configuration 834aba4 [lewuathe] [SPARK-5073] Fix style adcea33 [lewuathe] [SPARK-5073] Integrate memory map threshold to 2MB fcce2e5 [lewuathe] [SPARK-5073] spark.storage.memoryMapThreshold have two default value Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1656aae2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1656aae2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1656aae2 Branch: refs/heads/master Commit: 1656aae2b4e8b026f8cfe782519f72d32ed2b291 Parents: 3313260 Author: lewuathe lewua...@me.com Authored: Sun Jan 11 13:50:42 2015 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Jan 11 13:50:42 2015 -0800 -- core/src/main/scala/org/apache/spark/storage/DiskStore.scala | 3 ++- docs/configuration.md| 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1656aae2/core/src/main/scala/org/apache/spark/storage/DiskStore.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 8dadf67..61ef5ff 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -31,7 +31,8 @@ import org.apache.spark.util.Utils private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManager) extends BlockStore(blockManager) with Logging { - val minMemoryMapBytes = blockManager.conf.getLong(spark.storage.memoryMapThreshold, 2 * 4096L) + val minMemoryMapBytes = blockManager.conf.getLong( +spark.storage.memoryMapThreshold, 2 * 1024L * 1024L) override def getSize(blockId: BlockId): Long = { diskManager.getFile(blockId.name).length http://git-wip-us.apache.org/repos/asf/spark/blob/1656aae2/docs/configuration.md -- diff --git a/docs/configuration.md b/docs/configuration.md index 2add485..f292bfb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -678,7 +678,7 @@ Apart from these, the following properties are also available, and may be useful /tr tr tdcodespark.storage.memoryMapThreshold/code/td - td8192/td + td2097152/td td Size of a block, in bytes, above which Spark memory maps when reading a block from disk. This prevents Spark from memory mapping very small blocks. In general, memory - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [Minor] Fix test RetryingBlockFetcherSuite after changed config name
Repository: spark Updated Branches: refs/heads/master f3da4bd72 - b4034c3f8 [Minor] Fix test RetryingBlockFetcherSuite after changed config name Flakey due to the default retry interval being the same as our test's wait timeout. Author: Aaron Davidson aa...@databricks.com Closes #3972 from aarondav/fix-test and squashes the following commits: db77cab [Aaron Davidson] [Minor] Fix test after changed config name Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4034c3f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4034c3f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4034c3f Branch: refs/heads/master Commit: b4034c3f889bf24f60eb806802866b48e4cbe55c Parents: f3da4bd Author: Aaron Davidson aa...@databricks.com Authored: Fri Jan 9 09:20:16 2015 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Fri Jan 9 09:20:16 2015 -0800 -- .../apache/spark/network/shuffle/RetryingBlockFetcherSuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b4034c3f/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java -- diff --git a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java index 0191fe5..1ad0d72 100644 --- a/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java +++ b/network/shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java @@ -54,13 +54,13 @@ public class RetryingBlockFetcherSuite { @Before public void beforeEach() { System.setProperty(spark.shuffle.io.maxRetries, 2); -System.setProperty(spark.shuffle.io.retryWaitMs, 0); +System.setProperty(spark.shuffle.io.retryWait, 0); } @After public void afterEach() { System.clearProperty(spark.shuffle.io.maxRetries); -System.clearProperty(spark.shuffle.io.retryWaitMs); +System.clearProperty(spark.shuffle.io.retryWait); } @Test - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-4805 [CORE] BlockTransferMessage.toByteArray() trips assertion
Repository: spark Updated Branches: refs/heads/master 5e4c06f8e - d8f84f26e SPARK-4805 [CORE] BlockTransferMessage.toByteArray() trips assertion Allocate enough room for type byte as well as message, to avoid tripping assertion about capacity of the buffer Author: Sean Owen so...@cloudera.com Closes #3650 from srowen/SPARK-4805 and squashes the following commits: 9e1d502 [Sean Owen] Allocate enough room for type byte as well as message, to avoid tripping assertion about capacity of the buffer Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8f84f26 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8f84f26 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8f84f26 Branch: refs/heads/master Commit: d8f84f26e388055ca7459810e001d05ab60af15b Parents: 5e4c06f Author: Sean Owen so...@cloudera.com Authored: Tue Dec 9 16:38:27 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Dec 9 16:38:27 2014 -0800 -- .../spark/network/shuffle/protocol/BlockTransferMessage.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d8f84f26/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index b4b13b8..6c1210b 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -67,7 +67,8 @@ public abstract class BlockTransferMessage implements Encodable { /** Serializes the 'type' byte followed by the message itself. */ public byte[] toByteArray() { -ByteBuf buf = Unpooled.buffer(encodedLength()); +// Allow room for encoded message, plus the type byte +ByteBuf buf = Unpooled.buffer(encodedLength() + 1); buf.writeByte(type().id); encode(buf); assert buf.writableBytes() == 0 : Writable bytes remain: + buf.writableBytes(); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: SPARK-4805 [CORE] BlockTransferMessage.toByteArray() trips assertion
Repository: spark Updated Branches: refs/heads/branch-1.2 51da2c557 - b0d64e572 SPARK-4805 [CORE] BlockTransferMessage.toByteArray() trips assertion Allocate enough room for type byte as well as message, to avoid tripping assertion about capacity of the buffer Author: Sean Owen so...@cloudera.com Closes #3650 from srowen/SPARK-4805 and squashes the following commits: 9e1d502 [Sean Owen] Allocate enough room for type byte as well as message, to avoid tripping assertion about capacity of the buffer (cherry picked from commit d8f84f26e388055ca7459810e001d05ab60af15b) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0d64e57 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0d64e57 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0d64e57 Branch: refs/heads/branch-1.2 Commit: b0d64e57255e5ca545c90f18bd9d10a07ae43759 Parents: 51da2c5 Author: Sean Owen so...@cloudera.com Authored: Tue Dec 9 16:38:27 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Dec 9 16:38:49 2014 -0800 -- .../spark/network/shuffle/protocol/BlockTransferMessage.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b0d64e57/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index b4b13b8..6c1210b 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -67,7 +67,8 @@ public abstract class BlockTransferMessage implements Encodable { /** Serializes the 'type' byte followed by the message itself. */ public byte[] toByteArray() { -ByteBuf buf = Unpooled.buffer(encodedLength()); +// Allow room for encoded message, plus the type byte +ByteBuf buf = Unpooled.buffer(encodedLength() + 1); buf.writeByte(type().id); encode(buf); assert buf.writableBytes() == 0 : Writable bytes remain: + buf.writableBytes(); - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: Config updates for the new shuffle transport.
Repository: spark Updated Branches: refs/heads/master 2b9b72682 - 9bd9334f5 Config updates for the new shuffle transport. Author: Reynold Xin r...@databricks.com Closes #3657 from rxin/conf-update and squashes the following commits: 7370eab [Reynold Xin] Config updates for the new shuffle transport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bd9334f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bd9334f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bd9334f Branch: refs/heads/master Commit: 9bd9334f588dbb44d01554f9f4ca68a153a48993 Parents: 2b9b726 Author: Reynold Xin r...@databricks.com Authored: Tue Dec 9 19:29:09 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Dec 9 19:29:09 2014 -0800 -- .../java/org/apache/spark/network/util/TransportConf.java| 8 .../org/apache/spark/network/sasl/SaslClientBootstrap.java | 2 +- .../apache/spark/network/shuffle/RetryingBlockFetcher.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9bd9334f/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index f605739..13b37f9 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -35,14 +35,14 @@ public class TransportConf { return conf.getBoolean(spark.shuffle.io.preferDirectBufs, true); } - /** Connect timeout in secs. Default 120 secs. */ + /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { return conf.getInt(spark.shuffle.io.connectionTimeout, 120) * 1000; } /** Number of concurrent connections between two nodes for fetching data. **/ public int numConnectionsPerPeer() { -return conf.getInt(spark.shuffle.io.numConnectionsPerPeer, 2); +return conf.getInt(spark.shuffle.io.numConnectionsPerPeer, 1); } /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ @@ -67,7 +67,7 @@ public class TransportConf { public int sendBuf() { return conf.getInt(spark.shuffle.io.sendBuffer, -1); } /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ - public int saslRTTimeout() { return conf.getInt(spark.shuffle.sasl.timeout, 3); } + public int saslRTTimeoutMs() { return conf.getInt(spark.shuffle.sasl.timeout, 30) * 1000; } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. @@ -79,7 +79,7 @@ public class TransportConf { * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. * Only relevant if maxIORetries gt; 0. */ - public int ioRetryWaitTime() { return conf.getInt(spark.shuffle.io.retryWaitMs, 5000); } + public int ioRetryWaitTimeMs() { return conf.getInt(spark.shuffle.io.retryWait, 5) * 1000; } /** * Minimum size of a block that we should start using memory map rather than reading in through http://git-wip-us.apache.org/repos/asf/spark/blob/9bd9334f/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java index 7bc91e3..33aa134 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java @@ -59,7 +59,7 @@ public class SaslClientBootstrap implements TransportClientBootstrap { ByteBuf buf = Unpooled.buffer(msg.encodedLength()); msg.encode(buf); -byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout()); +byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs()); payload = saslClient.response(response); } } finally { http://git-wip-us.apache.org/repos/asf/spark/blob/9bd9334f/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
spark git commit: Config updates for the new shuffle transport.
Repository: spark Updated Branches: refs/heads/branch-1.2 441ec3451 - 5e5d8f469 Config updates for the new shuffle transport. Author: Reynold Xin r...@databricks.com Closes #3657 from rxin/conf-update and squashes the following commits: 7370eab [Reynold Xin] Config updates for the new shuffle transport. (cherry picked from commit 9bd9334f588dbb44d01554f9f4ca68a153a48993) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e5d8f46 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e5d8f46 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e5d8f46 Branch: refs/heads/branch-1.2 Commit: 5e5d8f469a1bea9bbe606f772ccdcab7c184c651 Parents: 441ec34 Author: Reynold Xin r...@databricks.com Authored: Tue Dec 9 19:29:09 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Dec 9 19:29:26 2014 -0800 -- .../java/org/apache/spark/network/util/TransportConf.java| 8 .../org/apache/spark/network/sasl/SaslClientBootstrap.java | 2 +- .../apache/spark/network/shuffle/RetryingBlockFetcher.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5e5d8f46/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java -- diff --git a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java index f605739..13b37f9 100644 --- a/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java +++ b/network/common/src/main/java/org/apache/spark/network/util/TransportConf.java @@ -35,14 +35,14 @@ public class TransportConf { return conf.getBoolean(spark.shuffle.io.preferDirectBufs, true); } - /** Connect timeout in secs. Default 120 secs. */ + /** Connect timeout in milliseconds. Default 120 secs. */ public int connectionTimeoutMs() { return conf.getInt(spark.shuffle.io.connectionTimeout, 120) * 1000; } /** Number of concurrent connections between two nodes for fetching data. **/ public int numConnectionsPerPeer() { -return conf.getInt(spark.shuffle.io.numConnectionsPerPeer, 2); +return conf.getInt(spark.shuffle.io.numConnectionsPerPeer, 1); } /** Requested maximum length of the queue of incoming connections. Default -1 for no backlog. */ @@ -67,7 +67,7 @@ public class TransportConf { public int sendBuf() { return conf.getInt(spark.shuffle.io.sendBuffer, -1); } /** Timeout for a single round trip of SASL token exchange, in milliseconds. */ - public int saslRTTimeout() { return conf.getInt(spark.shuffle.sasl.timeout, 3); } + public int saslRTTimeoutMs() { return conf.getInt(spark.shuffle.sasl.timeout, 30) * 1000; } /** * Max number of times we will try IO exceptions (such as connection timeouts) per request. @@ -79,7 +79,7 @@ public class TransportConf { * Time (in milliseconds) that we will wait in order to perform a retry after an IOException. * Only relevant if maxIORetries gt; 0. */ - public int ioRetryWaitTime() { return conf.getInt(spark.shuffle.io.retryWaitMs, 5000); } + public int ioRetryWaitTimeMs() { return conf.getInt(spark.shuffle.io.retryWait, 5) * 1000; } /** * Minimum size of a block that we should start using memory map rather than reading in through http://git-wip-us.apache.org/repos/asf/spark/blob/5e5d8f46/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java index 7bc91e3..33aa134 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/sasl/SaslClientBootstrap.java @@ -59,7 +59,7 @@ public class SaslClientBootstrap implements TransportClientBootstrap { ByteBuf buf = Unpooled.buffer(msg.encodedLength()); msg.encode(buf); -byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeout()); +byte[] response = client.sendRpcSync(buf.array(), conf.saslRTTimeoutMs()); payload = saslClient.response(response); } } finally { http://git-wip-us.apache.org/repos/asf/spark/blob/5e5d8f46/network/shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java -- diff --git
spark git commit: [SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'
Repository: spark Updated Branches: refs/heads/master 51b1fe142 - bcb5cdad6 [SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped' Since `sequenceNumberToProcessor` and `stopped` are both protected by the lock `sequenceNumberToProcessor`, `ConcurrentHashMap` and `volatile` is unnecessary. So this PR updated them accordingly. Author: zsxwing zsxw...@gmail.com Closes #3634 from zsxwing/SPARK-3154 and squashes the following commits: 0d087ac [zsxwing] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped' Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bcb5cdad Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bcb5cdad Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bcb5cdad Branch: refs/heads/master Commit: bcb5cdad614d4fce43725dfec3ce88172d2f8c11 Parents: 51b1fe1 Author: zsxwing zsxw...@gmail.com Authored: Mon Dec 8 23:54:15 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Dec 8 23:54:15 2014 -0800 -- .../flume/sink/SparkAvroCallbackHandler.scala | 23 ++-- 1 file changed, 12 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/bcb5cdad/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala -- diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala index 3c656a3..4373be4 100644 --- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala +++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala @@ -16,10 +16,10 @@ */ package org.apache.spark.streaming.flume.sink -import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors} +import java.util.concurrent.{CountDownLatch, Executors} import java.util.concurrent.atomic.AtomicLong -import scala.collection.JavaConversions._ +import scala.collection.mutable import org.apache.flume.Channel import org.apache.commons.lang.RandomStringUtils @@ -47,8 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat(Spark Sink Processor Thread - %d).build())) - private val sequenceNumberToProcessor = -new ConcurrentHashMap[CharSequence, TransactionProcessor]() + // Protected by `sequenceNumberToProcessor` + private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]() // This sink will not persist sequence numbers and reuses them if it gets restarted. // So it is possible to commit a transaction which may have been meant for the sink before the // restart. @@ -58,8 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha private val seqBase = RandomStringUtils.randomAlphanumeric(8) private val seqCounter = new AtomicLong(0) - - @volatile private var stopped = false + // Protected by `sequenceNumberToProcessor` + private var stopped = false @volatile private var isTest = false private var testLatch: CountDownLatch = null @@ -131,7 +131,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha * @param success Whether the batch was successful or not. */ private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) { -Option(removeAndGetProcessor(sequenceNumber)).foreach(processor = { +removeAndGetProcessor(sequenceNumber).foreach(processor = { processor.batchProcessed(success) }) } @@ -139,10 +139,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha /** * Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak. * @param sequenceNumber - * @return The transaction processor for the corresponding batch. Note that this instance is no - * longer tracked and the caller is responsible for that txn processor. + * @return An `Option` of the transaction processor for the corresponding batch. Note that this + * instance is no longer tracked and the caller is responsible for that txn processor. */ - private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = { + private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): + Option[TransactionProcessor] = {
spark git commit: [SPARK-4326] fix unidoc
Repository: spark Updated Branches: refs/heads/master a0fa1ba70 - 4b0c1edfd [SPARK-4326] fix unidoc There are two issues: 1. specifying guava 11.0.2 will cause hashInt not found in unidoc (any reason to force the version here?) 2. unidoc doesn't recognize static class defined in a base class aarondav srowen vanzin Author: Xiangrui Meng m...@databricks.com Closes #3253 from mengxr/SPARK-4326 and squashes the following commits: 53967bf [Xiangrui Meng] fix unidoc Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b0c1edf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b0c1edf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b0c1edf Branch: refs/heads/master Commit: 4b0c1edfdf457cde0e39083c47961184059efded Parents: a0fa1ba Author: Xiangrui Meng m...@databricks.com Authored: Thu Nov 13 13:16:20 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Nov 13 13:16:20 2014 -0800 -- network/common/pom.xml | 1 - network/shuffle/pom.xml | 1 - .../org/apache/spark/network/shuffle/protocol/OpenBlocks.java| 1 + .../apache/spark/network/shuffle/protocol/RegisterExecutor.java | 1 + .../org/apache/spark/network/shuffle/protocol/StreamHandle.java | 4 ++-- .../org/apache/spark/network/shuffle/protocol/UploadBlock.java | 2 ++ 6 files changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/4b0c1edf/network/common/pom.xml -- diff --git a/network/common/pom.xml b/network/common/pom.xml index a6bee7e..2bd0a7d 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -51,7 +51,6 @@ dependency groupIdcom.google.guava/groupId artifactIdguava/artifactId - version11.0.2/version !-- yarn 2.4.0's version -- scopeprovided/scope /dependency http://git-wip-us.apache.org/repos/asf/spark/blob/4b0c1edf/network/shuffle/pom.xml -- diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index be78331..12ff034 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -52,7 +52,6 @@ dependency groupIdcom.google.guava/groupId artifactIdguava/artifactId - version11.0.2/version !-- yarn 2.4.0's version -- scopeprovided/scope /dependency http://git-wip-us.apache.org/repos/asf/spark/blob/4b0c1edf/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java index 60485ba..62fce9b 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java @@ -23,6 +23,7 @@ import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** Request to read a set of blocks. Returns {@link StreamHandle}. */ public class OpenBlocks extends BlockTransferMessage { http://git-wip-us.apache.org/repos/asf/spark/blob/4b0c1edf/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java index 38acae3..7eb4385 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java @@ -21,6 +21,7 @@ import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** * Initial registration message between an executor and its local shuffle server. http://git-wip-us.apache.org/repos/asf/spark/blob/4b0c1edf/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java
spark git commit: [SPARK-4326] fix unidoc
Repository: spark Updated Branches: refs/heads/branch-1.2 c07592e40 - d993a44de [SPARK-4326] fix unidoc There are two issues: 1. specifying guava 11.0.2 will cause hashInt not found in unidoc (any reason to force the version here?) 2. unidoc doesn't recognize static class defined in a base class aarondav srowen vanzin Author: Xiangrui Meng m...@databricks.com Closes #3253 from mengxr/SPARK-4326 and squashes the following commits: 53967bf [Xiangrui Meng] fix unidoc (cherry picked from commit 4b0c1edfdf457cde0e39083c47961184059efded) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d993a44d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d993a44d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d993a44d Branch: refs/heads/branch-1.2 Commit: d993a44de2bf91e93c5ad3f84d35ff4e55f4b2fb Parents: c07592e Author: Xiangrui Meng m...@databricks.com Authored: Thu Nov 13 13:16:20 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Nov 13 13:16:51 2014 -0800 -- network/common/pom.xml | 1 - network/shuffle/pom.xml | 1 - .../org/apache/spark/network/shuffle/protocol/OpenBlocks.java| 1 + .../apache/spark/network/shuffle/protocol/RegisterExecutor.java | 1 + .../org/apache/spark/network/shuffle/protocol/StreamHandle.java | 4 ++-- .../org/apache/spark/network/shuffle/protocol/UploadBlock.java | 2 ++ 6 files changed, 6 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d993a44d/network/common/pom.xml -- diff --git a/network/common/pom.xml b/network/common/pom.xml index a6bee7e..2bd0a7d 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -51,7 +51,6 @@ dependency groupIdcom.google.guava/groupId artifactIdguava/artifactId - version11.0.2/version !-- yarn 2.4.0's version -- scopeprovided/scope /dependency http://git-wip-us.apache.org/repos/asf/spark/blob/d993a44d/network/shuffle/pom.xml -- diff --git a/network/shuffle/pom.xml b/network/shuffle/pom.xml index be78331..12ff034 100644 --- a/network/shuffle/pom.xml +++ b/network/shuffle/pom.xml @@ -52,7 +52,6 @@ dependency groupIdcom.google.guava/groupId artifactIdguava/artifactId - version11.0.2/version !-- yarn 2.4.0's version -- scopeprovided/scope /dependency http://git-wip-us.apache.org/repos/asf/spark/blob/d993a44d/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java index 60485ba..62fce9b 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/OpenBlocks.java @@ -23,6 +23,7 @@ import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** Request to read a set of blocks. Returns {@link StreamHandle}. */ public class OpenBlocks extends BlockTransferMessage { http://git-wip-us.apache.org/repos/asf/spark/blob/d993a44d/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java -- diff --git a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java index 38acae3..7eb4385 100644 --- a/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java +++ b/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterExecutor.java @@ -21,6 +21,7 @@ import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; +import org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** * Initial registration message between an executor and its local shuffle server. http://git-wip-us.apache.org/repos/asf/spark/blob/d993a44d/network/shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/StreamHandle.java -- diff --git
spark git commit: [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Repository: spark Updated Branches: refs/heads/master 65083e93d - ef29a9a9a [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin r...@databricks.com Author: Reynold Xin r...@apache.org Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef29a9a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef29a9a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef29a9a9 Branch: refs/heads/master Commit: ef29a9a9aa85468869eb67ca67b66c65f508d0ee Parents: 65083e9 Author: Reynold Xin r...@databricks.com Authored: Tue Nov 11 00:25:31 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Nov 11 00:25:31 2014 -0800 -- .../worker/StandaloneWorkerShuffleService.scala | 2 +- .../spark/shuffle/FileShuffleBlockManager.scala | 8 +- .../shuffle/IndexShuffleBlockManager.scala | 8 +- .../spark/shuffle/sort/SortShuffleManager.scala | 2 +- .../spark/ExternalShuffleServiceSuite.scala | 2 +- .../buffer/FileSegmentManagedBuffer.java| 23 ++-- .../spark/network/buffer/LazyFileRegion.java| 111 +++ .../spark/network/util/TransportConf.java | 17 +++ .../network/ChunkFetchIntegrationSuite.java | 9 +- .../shuffle/ExternalShuffleBlockHandler.java| 5 +- .../shuffle/ExternalShuffleBlockManager.java| 13 ++- .../ExternalShuffleBlockManagerSuite.java | 10 +- .../shuffle/ExternalShuffleCleanupSuite.java| 13 ++- .../ExternalShuffleIntegrationSuite.java| 2 +- .../shuffle/ExternalShuffleSecuritySuite.java | 2 +- .../spark/network/yarn/YarnShuffleService.java | 4 +- 16 files changed, 191 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ef29a9a9/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index 88118e2..d044e1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu private val useSasl: Boolean = securityManager.isAuthenticationEnabled() private val transportConf = SparkTransportConf.fromSparkConf(sparkConf) - private val blockHandler = new ExternalShuffleBlockHandler() + private val blockHandler = new ExternalShuffleBlockHandler(transportConf) private val transportContext: TransportContext = { val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler new TransportContext(transportConf, handler) http://git-wip-us.apache.org/repos/asf/spark/blob/ef29a9a9/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index f03e8e4..7de2f9c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} +import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup import org.apache.spark.storage._ @@ -68,6 +69,8 @@ private[spark] class FileShuffleBlockManager(conf: SparkConf) extends ShuffleBlockManager with
spark git commit: [SPARK-4307] Initialize FileDescriptor lazily in FileRegion.
Repository: spark Updated Branches: refs/heads/branch-1.2 df8242c9b - e9d009dc3 [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. Netty's DefaultFileRegion requires a FileDescriptor in its constructor, which means we need to have a opened file handle. In super large workloads, this could lead to too many open files due to the way these file descriptors are cleaned. This pull request creates a new LazyFileRegion that initializes the FileDescriptor when we are sending data for the first time. Author: Reynold Xin r...@databricks.com Author: Reynold Xin r...@apache.org Closes #3172 from rxin/lazyFD and squashes the following commits: 0bdcdc6 [Reynold Xin] Added reference to Netty's DefaultFileRegion d4564ae [Reynold Xin] Added SparkConf to the ctor argument of IndexShuffleBlockManager. 6ed369e [Reynold Xin] Code review feedback. 04cddc8 [Reynold Xin] [SPARK-4307] Initialize FileDescriptor lazily in FileRegion. (cherry picked from commit ef29a9a9aa85468869eb67ca67b66c65f508d0ee) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9d009dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9d009dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9d009dc Branch: refs/heads/branch-1.2 Commit: e9d009dc348bc06198ed2c9e03f1ba870401e6df Parents: df8242c Author: Reynold Xin r...@databricks.com Authored: Tue Nov 11 00:25:31 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Nov 11 00:25:49 2014 -0800 -- .../worker/StandaloneWorkerShuffleService.scala | 2 +- .../spark/shuffle/FileShuffleBlockManager.scala | 8 +- .../shuffle/IndexShuffleBlockManager.scala | 8 +- .../spark/shuffle/sort/SortShuffleManager.scala | 2 +- .../spark/ExternalShuffleServiceSuite.scala | 2 +- .../buffer/FileSegmentManagedBuffer.java| 23 ++-- .../spark/network/buffer/LazyFileRegion.java| 111 +++ .../spark/network/util/TransportConf.java | 17 +++ .../network/ChunkFetchIntegrationSuite.java | 9 +- .../shuffle/ExternalShuffleBlockHandler.java| 5 +- .../shuffle/ExternalShuffleBlockManager.java| 13 ++- .../ExternalShuffleBlockManagerSuite.java | 10 +- .../shuffle/ExternalShuffleCleanupSuite.java| 13 ++- .../ExternalShuffleIntegrationSuite.java| 2 +- .../shuffle/ExternalShuffleSecuritySuite.java | 2 +- .../spark/network/yarn/YarnShuffleService.java | 4 +- 16 files changed, 191 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index 88118e2..d044e1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -40,7 +40,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu private val useSasl: Boolean = securityManager.isAuthenticationEnabled() private val transportConf = SparkTransportConf.fromSparkConf(sparkConf) - private val blockHandler = new ExternalShuffleBlockHandler() + private val blockHandler = new ExternalShuffleBlockHandler(transportConf) private val transportContext: TransportContext = { val handler = if (useSasl) new SaslRpcHandler(blockHandler, securityManager) else blockHandler new TransportContext(transportConf, handler) http://git-wip-us.apache.org/repos/asf/spark/blob/e9d009dc/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala index f03e8e4..7de2f9c 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockManager.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} +import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup import
spark git commit: SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable
Repository: spark Updated Branches: refs/heads/master 6e03de304 - deefd9d73 SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable Author: Prashant Sharma prashan...@imaginea.com Closes #771 from ScrapCodes/deploy-failover-pluggable and squashes the following commits: 29ba440 [Prashant Sharma] fixed a compilation error fef35ec [Prashant Sharma] Code review 57ee6f0 [Prashant Sharma] SPARK-1830 Deploy failover, Make Persistence engine and LeaderAgent Pluggable. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/deefd9d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/deefd9d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/deefd9d7 Branch: refs/heads/master Commit: deefd9d7377a8091a1d184b99066febd0e9f6afd Parents: 6e03de3 Author: Prashant Sharma prashan...@imaginea.com Authored: Tue Nov 11 09:29:48 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Nov 11 09:29:48 2014 -0800 -- .../spark/deploy/master/ApplicationInfo.scala | 1 + .../apache/spark/deploy/master/DriverInfo.scala | 1 + .../master/FileSystemPersistenceEngine.scala| 62 + .../deploy/master/LeaderElectionAgent.scala | 37 +-- .../org/apache/spark/deploy/master/Master.scala | 40 ++- .../spark/deploy/master/PersistenceEngine.scala | 70 +++- .../deploy/master/RecoveryModeFactory.scala | 69 +++ .../apache/spark/deploy/master/WorkerInfo.scala | 1 + .../master/ZooKeeperLeaderElectionAgent.scala | 24 ++- .../master/ZooKeeperPersistenceEngine.scala | 56 ++-- 10 files changed, 211 insertions(+), 150 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 6ba395b..ad7d817 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer import akka.actor.ActorRef +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.ApplicationDescription import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala index 2ac2118..9d3d793 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.master import java.util.Date +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.deploy.DriverDescription import org.apache.spark.util.Utils http://git-wip-us.apache.org/repos/asf/spark/blob/deefd9d7/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 08a99bb..6ff2aa5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -18,10 +18,12 @@ package org.apache.spark.deploy.master import java.io._ - -import akka.serialization.Serialization +import java.nio.ByteBuffer import org.apache.spark.Logging +import org.apache.spark.serializer.Serializer + +import scala.reflect.ClassTag /** * Stores data in a single on-disk directory with one file per application and worker. @@ -32,65 +34,39 @@ import org.apache.spark.Logging */ private[spark] class FileSystemPersistenceEngine( val dir: String, -val serialization: Serialization) +val serialization: Serializer) extends PersistenceEngine with Logging { + val serializer = serialization.newInstance() new File(dir).mkdir() - override def addApplication(app: ApplicationInfo) { -val appFile = new File(dir + File.separator + app_ + app.id) -serializeIntoFile(appFile, app) - } - - override def removeApplication(app: ApplicationInfo) { -new File(dir + File.separator + app_ +
spark git commit: [SPARK-4264] Completion iterator should only invoke callback once
Repository: spark Updated Branches: refs/heads/master b41a39e24 - 23eaf0e12 [SPARK-4264] Completion iterator should only invoke callback once Author: Aaron Davidson aa...@databricks.com Closes #3128 from aarondav/compiter and squashes the following commits: 698e4be [Aaron Davidson] [SPARK-4264] Completion iterator should only invoke callback once Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23eaf0e1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23eaf0e1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23eaf0e1 Branch: refs/heads/master Commit: 23eaf0e12ff221dcca40a79e61b6cc5e7c846cb5 Parents: b41a39e Author: Aaron Davidson aa...@databricks.com Authored: Thu Nov 6 10:45:46 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Nov 6 10:45:46 2014 -0800 -- .../apache/spark/util/CompletionIterator.scala | 5 ++- .../spark/util/CompletionIteratorSuite.scala| 47 2 files changed, 51 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/23eaf0e1/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala index b6a0998..3903102 100644 --- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala @@ -25,10 +25,13 @@ private[spark] // scalastyle:off abstract class CompletionIterator[ +A, +I : Iterator[A]](sub: I) extends Iterator[A] { // scalastyle:on + + private[this] var completed = false def next() = sub.next() def hasNext = { val r = sub.hasNext -if (!r) { +if (!r !completed) { + completed = true completion() } r http://git-wip-us.apache.org/repos/asf/spark/blob/23eaf0e1/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala new file mode 100644 index 000..3755d43 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.scalatest.FunSuite + +class CompletionIteratorSuite extends FunSuite { + test(basic test) { +var numTimesCompleted = 0 +val iter = List(1, 2, 3).iterator +val completionIter = CompletionIterator[Int, Iterator[Int]](iter, { numTimesCompleted += 1 }) + +assert(completionIter.hasNext) +assert(completionIter.next() === 1) +assert(numTimesCompleted === 0) + +assert(completionIter.hasNext) +assert(completionIter.next() === 2) +assert(numTimesCompleted === 0) + +assert(completionIter.hasNext) +assert(completionIter.next() === 3) +assert(numTimesCompleted === 0) + +assert(!completionIter.hasNext) +assert(numTimesCompleted === 1) + +// SPARK-4264: Calling hasNext should not trigger the completion callback again. +assert(!completionIter.hasNext) +assert(numTimesCompleted === 1) + } +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
spark git commit: [SPARK-4264] Completion iterator should only invoke callback once
Repository: spark Updated Branches: refs/heads/branch-1.2 01484455c - aaaeaf939 [SPARK-4264] Completion iterator should only invoke callback once Author: Aaron Davidson aa...@databricks.com Closes #3128 from aarondav/compiter and squashes the following commits: 698e4be [Aaron Davidson] [SPARK-4264] Completion iterator should only invoke callback once (cherry picked from commit 23eaf0e12ff221dcca40a79e61b6cc5e7c846cb5) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aaaeaf93 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aaaeaf93 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aaaeaf93 Branch: refs/heads/branch-1.2 Commit: aaaeaf93902a1954df11fa4982b1c6c7e29f5b8d Parents: 0148445 Author: Aaron Davidson aa...@databricks.com Authored: Thu Nov 6 10:45:46 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Nov 6 10:46:05 2014 -0800 -- .../apache/spark/util/CompletionIterator.scala | 5 ++- .../spark/util/CompletionIteratorSuite.scala| 47 2 files changed, 51 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/aaaeaf93/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala index b6a0998..3903102 100644 --- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala @@ -25,10 +25,13 @@ private[spark] // scalastyle:off abstract class CompletionIterator[ +A, +I : Iterator[A]](sub: I) extends Iterator[A] { // scalastyle:on + + private[this] var completed = false def next() = sub.next() def hasNext = { val r = sub.hasNext -if (!r) { +if (!r !completed) { + completed = true completion() } r http://git-wip-us.apache.org/repos/asf/spark/blob/aaaeaf93/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala new file mode 100644 index 000..3755d43 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.scalatest.FunSuite + +class CompletionIteratorSuite extends FunSuite { + test(basic test) { +var numTimesCompleted = 0 +val iter = List(1, 2, 3).iterator +val completionIter = CompletionIterator[Int, Iterator[Int]](iter, { numTimesCompleted += 1 }) + +assert(completionIter.hasNext) +assert(completionIter.next() === 1) +assert(numTimesCompleted === 0) + +assert(completionIter.hasNext) +assert(completionIter.next() === 2) +assert(numTimesCompleted === 0) + +assert(completionIter.hasNext) +assert(completionIter.next() === 3) +assert(numTimesCompleted === 0) + +assert(!completionIter.hasNext) +assert(numTimesCompleted === 1) + +// SPARK-4264: Calling hasNext should not trigger the completion callback again. +assert(!completionIter.hasNext) +assert(numTimesCompleted === 1) + } +} - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4163][Core] Add a backward compatibility test for FetchFailed
Repository: spark Updated Branches: refs/heads/master 1a9c6cdda - 9bdc8412a [SPARK-4163][Core] Add a backward compatibility test for FetchFailed /cc aarondav Author: zsxwing zsxw...@gmail.com Closes #3086 from zsxwing/SPARK-4163-back-comp and squashes the following commits: 21cb2a8 [zsxwing] Add a backward compatibility test for FetchFailed Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bdc8412 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bdc8412 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bdc8412 Branch: refs/heads/master Commit: 9bdc8412a0160e06e8182bd8b2f9bb65b478c590 Parents: 1a9c6cd Author: zsxwing zsxw...@gmail.com Authored: Mon Nov 3 22:40:43 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Nov 3 22:40:43 2014 -0800 -- .../scala/org/apache/spark/util/JsonProtocolSuite.scala | 11 +++ 1 file changed, 11 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8412/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a91c9dd..0103012 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -177,6 +177,17 @@ class JsonProtocolSuite extends FunSuite { deserializedBmRemoved) } + test(FetchFailed backwards compatibility) { +// FetchFailed in Spark 1.1.0 does not have an Message property. +val fetchFailed = FetchFailed(BlockManagerId(With or, without you, 15), 17, 18, 19, + ignored) +val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed) + .removeField({ _._1 == Message }) +val expectedFetchFailed = FetchFailed(BlockManagerId(With or, without you, 15), 17, 18, 19, + Unknown reason) +assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent)) + } + test(SparkListenerApplicationStart backwards compatibility) { // SparkListenerApplicationStart in Spark 1.0.0 do not have an appId property. val applicationStart = SparkListenerApplicationStart(test, None, 1L, user) - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-4163][Core][WebUI] Send the fetch failure message back to Web UI
Repository: spark Updated Branches: refs/heads/master 001acc446 - 76386e1a2 [SPARK-4163][Core][WebUI] Send the fetch failure message back to Web UI This is a PR to send the fetch failure message back to Web UI. Before: ![f1](https://cloud.githubusercontent.com/assets/1000778/4856595/1f036c80-60be-11e4-956f-335147fbccb7.png) ![f2](https://cloud.githubusercontent.com/assets/1000778/4856596/1f11cbea-60be-11e4-8fe9-9f9b2b35c884.png) After (Please ignore the meaning of exception, I threw it in the code directly because it's hard to simulate a fetch failure): ![e1](https://cloud.githubusercontent.com/assets/1000778/4856600/2657ea38-60be-11e4-9f2d-d56c5f900f10.png) ![e2](https://cloud.githubusercontent.com/assets/1000778/4856601/26595008-60be-11e4-912b-2744af786991.png) Author: zsxwing zsxw...@gmail.com Closes #3032 from zsxwing/SPARK-4163 and squashes the following commits: f7e1faf [zsxwing] Discard changes for FetchFailedException and minor modification 4e946f7 [zsxwing] Add e as the cause of SparkException 316767d [zsxwing] Add private[storage] to FetchResult d51b0b6 [zsxwing] Set e as the cause of FetchFailedException b88c919 [zsxwing] Use 'private[storage]' for case classes instead of 'sealed' 62103fd [zsxwing] Update as per review 0c07d1f [zsxwing] Backward-compatible support a3bca65 [zsxwing] Send the fetch failure message back to Web UI Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/76386e1a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/76386e1a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/76386e1a Branch: refs/heads/master Commit: 76386e1a23c55a58c0aeea67820aab2bac71b24b Parents: 001acc4 Author: zsxwing zsxw...@gmail.com Authored: Sun Nov 2 23:20:22 2014 -0800 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Nov 2 23:20:22 2014 -0800 -- .../scala/org/apache/spark/TaskEndReason.scala | 6 +- .../apache/spark/scheduler/DAGScheduler.scala | 4 +- .../org/apache/spark/scheduler/JobLogger.scala | 2 +- .../spark/shuffle/FetchFailedException.scala| 16 ++-- .../shuffle/hash/BlockStoreShuffleFetcher.scala | 14 ++-- .../storage/ShuffleBlockFetcherIterator.scala | 82 +--- .../org/apache/spark/util/JsonProtocol.scala| 7 +- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../spark/scheduler/DAGSchedulerSuite.scala | 10 +-- .../ShuffleBlockFetcherIteratorSuite.scala | 8 +- .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 4 +- 12 files changed, 92 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/TaskEndReason.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index 202fba6..f45b463 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -69,11 +69,13 @@ case class FetchFailed( bmAddress: BlockManagerId, // Note that bmAddress can be null shuffleId: Int, mapId: Int, -reduceId: Int) +reduceId: Int, +message: String) extends TaskFailedReason { override def toErrorString: String = { val bmAddressString = if (bmAddress == null) null else bmAddress.toString -sFetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId) +sFetchFailed($bmAddressString, shuffleId=$shuffleId, mapId=$mapId, reduceId=$reduceId, + + smessage=\n$message\n) } } http://git-wip-us.apache.org/repos/asf/spark/blob/76386e1a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index af17b5d..96114c0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1053,7 +1053,7 @@ class DAGScheduler( logInfo(Resubmitted + task + , so marking it as still running) stage.pendingTasks += task - case FetchFailed(bmAddress, shuffleId, mapId, reduceId) = + case FetchFailed(bmAddress, shuffleId, mapId, reduceId, failureMessage) = val failedStage = stageIdToStage(task.stageId) val mapStage = shuffleToMapStage(shuffleId) @@ -1063,7 +1063,7 @@ class DAGScheduler( if (runningStages.contains(failedStage)) { logInfo(sMarking $failedStage (${failedStage.name}) as failed + sdue to a fetch failure from
git commit: HOTFIX: Clean up build in network module.
Repository: spark Updated Branches: refs/heads/master 26d31d15f - 0734d0932 HOTFIX: Clean up build in network module. This is currently breaking the package build for some people (including me). This patch does some general clean-up which also fixes the current issue. - Uses consistent artifact naming - Adds sbt support for this module - Changes tests to use scalatest (fixes the original issue[1]) One thing to note, it turns out that scalatest when invoked in the Maven build doesn't succesfully detect JUnit Java tests. This is a long standing issue, I noticed it applies to all of our current test suites as well. I've created SPARK-4159 to fix this. [1] The original issue is that we need to allocate extra memory for the tests, happens by default in our scalatest configuration. Author: Patrick Wendell pwend...@gmail.com Closes #3025 from pwendell/hotfix and squashes the following commits: faa9053 [Patrick Wendell] HOTFIX: Clean up build in network module. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0734d093 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0734d093 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0734d093 Branch: refs/heads/master Commit: 0734d09320fe37edd3a02718511cda0bda852478 Parents: 26d31d1 Author: Patrick Wendell pwend...@gmail.com Authored: Thu Oct 30 20:15:36 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Oct 30 20:15:36 2014 -0700 -- core/pom.xml | 2 +- network/common/pom.xml | 34 +- project/SparkBuild.scala | 8 +--- 3 files changed, 23 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/core/pom.xml -- diff --git a/core/pom.xml b/core/pom.xml index 8020a2d..6963ce4 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -46,7 +46,7 @@ /dependency dependency groupIdorg.apache.spark/groupId - artifactIdnetwork/artifactId + artifactIdspark-network-common_2.10/artifactId version${project.version}/version /dependency dependency http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/network/common/pom.xml -- diff --git a/network/common/pom.xml b/network/common/pom.xml index e3b7e32..a33e44b 100644 --- a/network/common/pom.xml +++ b/network/common/pom.xml @@ -27,12 +27,12 @@ /parent groupIdorg.apache.spark/groupId - artifactIdnetwork/artifactId + artifactIdspark-network-common_2.10/artifactId packagingjar/packaging - nameShuffle Streaming Service/name + nameSpark Project Common Network Code/name urlhttp://spark.apache.org//url properties -sbt.project.namenetwork/sbt.project.name +sbt.project.namenetwork-common/sbt.project.name /properties dependencies @@ -60,6 +60,11 @@ scopetest/scope /dependency dependency + groupIdcom.novocode/groupId + artifactIdjunit-interface/artifactId + scopetest/scope +/dependency +dependency groupIdlog4j/groupId artifactIdlog4j/artifactId scopetest/scope @@ -69,25 +74,20 @@ artifactIdmockito-all/artifactId scopetest/scope /dependency +dependency + groupIdorg.scalatest/groupId + artifactIdscalatest_${scala.binary.version}/artifactId + scopetest/scope +/dependency /dependencies - build -outputDirectorytarget/java/classes/outputDirectory -testOutputDirectorytarget/java/test-classes/testOutputDirectory + outputDirectorytarget/scala-${scala.binary.version}/classes/outputDirectory + testOutputDirectorytarget/scala-${scala.binary.version}/test-classes/testOutputDirectory plugins plugin -groupIdorg.apache.maven.plugins/groupId -artifactIdmaven-surefire-plugin/artifactId -version2.17/version -configuration - skipTestsfalse/skipTests - includes -include**/Test*.java/include -include**/*Test.java/include -include**/*Suite.java/include - /includes -/configuration +groupIdorg.scalatest/groupId +artifactIdscalatest-maven-plugin/artifactId /plugin /plugins /build http://git-wip-us.apache.org/repos/asf/spark/blob/0734d093/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 6d5eb68..7708351 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -31,10 +31,10 @@ object BuildCommons { private val buildLocation = file(.).getAbsoluteFile.getParentFile val allProjects@Seq(bagel, catalyst, core, graphx, hive,
[1/2] [SPARK-4084] Reuse sort key in Sorter
Repository: spark Updated Branches: refs/heads/master 4b55482ab - 84e5da87e http://git-wip-us.apache.org/repos/asf/spark/blob/84e5da87/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 6fe1079..066d47c 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import java.lang.{Float = JFloat} +import java.lang.{Float = JFloat, Integer = JInteger} import java.util.{Arrays, Comparator} import org.scalatest.FunSuite @@ -30,11 +30,15 @@ class SorterSuite extends FunSuite { val rand = new XORShiftRandom(123) val data0 = Array.tabulate[Int](1) { i = rand.nextInt() } val data1 = data0.clone() +val data2 = data0.clone() Arrays.sort(data0) new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int) +new Sorter(new KeyReuseIntArraySortDataFormat) + .sort(data2, 0, data2.length, Ordering[IntWrapper]) -data0.zip(data1).foreach { case (x, y) = assert(x === y) } +assert(data0.view === data1.view) +assert(data0.view === data2.view) } test(KVArraySorter) { @@ -61,10 +65,33 @@ class SorterSuite extends FunSuite { } } + /** Runs an experiment several times. */ + def runExperiment(name: String, skip: Boolean = false)(f: = Unit, prepare: () = Unit): Unit = { +if (skip) { + println(sSkipped experiment $name.) + return +} + +val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) +System.gc() + +var i = 0 +var next10: Long = 0 +while (i 10) { + val time = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) + next10 += time + println(s$name: Took $time ms) + i += 1 +} + +println(s$name: ($firstTry ms first try, ${next10 / 10} ms average)) + } + /** * This provides a simple benchmark for comparing the Sorter with Java internal sorting. * Ideally these would be executed one at a time, each in their own JVM, so their listing - * here is mainly to have the code. + * here is mainly to have the code. Running multiple tests within the same JVM session would + * prevent JIT inlining overridden methods and hence hurt the performance. * * The goal of this code is to sort an array of key-value pairs, where the array physically * has the keys and values alternating. The basic Java sorts work only on the keys, so the @@ -72,96 +99,167 @@ class SorterSuite extends FunSuite { * those, while the Sorter approach can work directly on the input data format. * * Note that the Java implementation varies tremendously between Java 6 and Java 7, when - * the Java sort changed from merge sort to Timsort. + * the Java sort changed from merge sort to TimSort. */ - ignore(Sorter benchmark) { - -/** Runs an experiment several times. */ -def runExperiment(name: String)(f: = Unit): Unit = { - val firstTry = org.apache.spark.util.Utils.timeIt(1)(f) - System.gc() - - var i = 0 - var next10: Long = 0 - while (i 10) { -val time = org.apache.spark.util.Utils.timeIt(1)(f) -next10 += time -println(s$name: Took $time ms) -i += 1 - } - - println(s$name: ($firstTry ms first try, ${next10 / 10} ms average)) -} - + ignore(Sorter benchmark for key-value pairs) { val numElements = 2500 // 25 mil val rand = new XORShiftRandom(123) -val keys = Array.tabulate[JFloat](numElements) { i = - new JFloat(rand.nextFloat()) +// Test our key-value pairs where each element is a Tuple2[Float, Integer]. + +val kvTuples = Array.tabulate(numElements) { i = + (new JFloat(rand.nextFloat()), new JInteger(i)) } -// Test our key-value pairs where each element is a Tuple2[Float, Integer) -val kvTupleArray = Array.tabulate[AnyRef](numElements) { i = - (keys(i / 2): Float, i / 2: Int) +val kvTupleArray = new Array[AnyRef](numElements) +val prepareKvTupleArray = () = { + System.arraycopy(kvTuples, 0, kvTupleArray, 0, numElements) } -runExperiment(Tuple-sort using Arrays.sort()) { +runExperiment(Tuple-sort using Arrays.sort())({ Arrays.sort(kvTupleArray, new Comparator[AnyRef] { override def compare(x: AnyRef, y: AnyRef): Int = - Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1) + x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, _)]._1) }) -} +}, prepareKvTupleArray) // Test our Sorter where each element alternates between Float and Integer, non-primitive -
[2/2] git commit: [SPARK-4084] Reuse sort key in Sorter
[SPARK-4084] Reuse sort key in Sorter Sorter uses generic-typed key for sorting. When data is large, it creates lots of key objects, which is not efficient. We should reuse the key in Sorter for memory efficiency. This change is part of the petabyte sort implementation from rxin . The `Sorter` class was written in Java and marked package private. So it is only available to `org.apache.spark.util.collection`. I renamed it to `TimSort` and add a simple wrapper of it, still called `Sorter`, in Scala, which is `private[spark]`. The benchmark code is updated, which now resets the array before each run. Here is the result on sorting primitive Int arrays of size 25 million using Sorter: ~~~ [info] - Sorter benchmark for key-value pairs !!! IGNORED !!! Java Arrays.sort() on non-primitive int array: Took 13237 ms Java Arrays.sort() on non-primitive int array: Took 13320 ms Java Arrays.sort() on non-primitive int array: Took 15718 ms Java Arrays.sort() on non-primitive int array: Took 13283 ms Java Arrays.sort() on non-primitive int array: Took 13267 ms Java Arrays.sort() on non-primitive int array: Took 15122 ms Java Arrays.sort() on non-primitive int array: Took 15495 ms Java Arrays.sort() on non-primitive int array: Took 14877 ms Java Arrays.sort() on non-primitive int array: Took 16429 ms Java Arrays.sort() on non-primitive int array: Took 14250 ms Java Arrays.sort() on non-primitive int array: (13878 ms first try, 14499 ms average) Java Arrays.sort() on primitive int array: Took 2683 ms Java Arrays.sort() on primitive int array: Took 2683 ms Java Arrays.sort() on primitive int array: Took 2701 ms Java Arrays.sort() on primitive int array: Took 2746 ms Java Arrays.sort() on primitive int array: Took 2685 ms Java Arrays.sort() on primitive int array: Took 2735 ms Java Arrays.sort() on primitive int array: Took 2669 ms Java Arrays.sort() on primitive int array: Took 2693 ms Java Arrays.sort() on primitive int array: Took 2680 ms Java Arrays.sort() on primitive int array: Took 2642 ms Java Arrays.sort() on primitive int array: (2948 ms first try, 2691 ms average) Sorter without key reuse on primitive int array: Took 10732 ms Sorter without key reuse on primitive int array: Took 12482 ms Sorter without key reuse on primitive int array: Took 10718 ms Sorter without key reuse on primitive int array: Took 12650 ms Sorter without key reuse on primitive int array: Took 10747 ms Sorter without key reuse on primitive int array: Took 10783 ms Sorter without key reuse on primitive int array: Took 12721 ms Sorter without key reuse on primitive int array: Took 10604 ms Sorter without key reuse on primitive int array: Took 10622 ms Sorter without key reuse on primitive int array: Took 11843 ms Sorter without key reuse on primitive int array: (11089 ms first try, 11390 ms average) Sorter with key reuse on primitive int array: Took 5141 ms Sorter with key reuse on primitive int array: Took 5298 ms Sorter with key reuse on primitive int array: Took 5066 ms Sorter with key reuse on primitive int array: Took 5164 ms Sorter with key reuse on primitive int array: Took 5203 ms Sorter with key reuse on primitive int array: Took 5274 ms Sorter with key reuse on primitive int array: Took 5186 ms Sorter with key reuse on primitive int array: Took 5159 ms Sorter with key reuse on primitive int array: Took 5164 ms Sorter with key reuse on primitive int array: Took 5078 ms Sorter with key reuse on primitive int array: (5311 ms first try, 5173 ms average) ~~~ So with key reuse, it is faster and less likely to trigger GC. Author: Xiangrui Meng m...@databricks.com Author: Reynold Xin r...@apache.org Closes #2937 from mengxr/SPARK-4084 and squashes the following commits: d73c3d0 [Xiangrui Meng] address comments 0b7b682 [Xiangrui Meng] fix mima a72f53c [Xiangrui Meng] update timeIt 38ba50c [Xiangrui Meng] update timeIt 720f731 [Xiangrui Meng] add doc about JIT specialization 78f2879 [Xiangrui Meng] update tests 7de2efd [Xiangrui Meng] update the Sorter benchmark code to be correct 8626356 [Xiangrui Meng] add prepare to timeIt and update testsin SorterSuite 5f0d530 [Xiangrui Meng] update method modifiers of SortDataFormat 6ffbe66 [Xiangrui Meng] rename Sorter to TimSort and add a Scala wrapper that is private[spark] b00db4d [Xiangrui Meng] doc and tests cf94e8a [Xiangrui Meng] renaming 464ddce [Reynold Xin] cherry-pick rxin's commit Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84e5da87 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84e5da87 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84e5da87 Branch: refs/heads/master Commit: 84e5da87e32256ba4f3dee6f8bf532ce88322028 Parents: 4b55482 Author: Xiangrui Meng m...@databricks.com Authored: Tue Oct 28 15:14:41 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Oct 28 15:14:41 2014 -0700 --
git commit: [SPARK-4008] Fix kryo with fold in KryoSerializerSuite
Repository: spark Updated Branches: refs/heads/master 84e5da87e - 1536d7033 [SPARK-4008] Fix kryo with fold in KryoSerializerSuite `zeroValue` will be serialized by `spark.closure.serializer` but `spark.closure.serializer` only supports the default Java serializer. So it must not be `ClassWithoutNoArgConstructor`, which can not be serialized by the Java serializer. This PR changed `zeroValue` to null and updated the test to make it work correctly. Author: zsxwing zsxw...@gmail.com Closes #2856 from zsxwing/SPARK-4008 and squashes the following commits: 51da655 [zsxwing] [SPARK-4008] Fix kryo with fold in KryoSerializerSuite Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1536d703 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1536d703 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1536d703 Branch: refs/heads/master Commit: 1536d70331e9a4f5b5ea9dabfd72592ca1fc8e35 Parents: 84e5da8 Author: zsxwing zsxw...@gmail.com Authored: Tue Oct 28 17:59:10 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Oct 28 17:59:10 2014 -0700 -- .../apache/spark/serializer/KryoSerializerSuite.scala | 13 + 1 file changed, 9 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1536d703/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 64ac6d2..a70f67a 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -201,12 +201,17 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { assert(control.sum === result) } - // TODO: this still doesn't work - ignore(kryo with fold) { + test(kryo with fold) { val control = 1 :: 2 :: Nil +// zeroValue must not be a ClassWithoutNoArgConstructor instance because it will be +// serialized by spark.closure.serializer but spark.closure.serializer only supports +// the default Java serializer. val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_)) -.fold(new ClassWithoutNoArgConstructor(10))((t1, t2) = new ClassWithoutNoArgConstructor(t1.x + t2.x)).x -assert(10 + control.sum === result) + .fold(null)((t1, t2) = { + val t1x = if (t1 == null) 0 else t1.x + new ClassWithoutNoArgConstructor(t1x + t2.x) +}).x +assert(control.sum === result) } test(kryo with nonexistent custom registrator should fail) { - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: Add more debug message for ManagedBuffer
Repository: spark Updated Branches: refs/heads/master dab1b0ae2 - e43c72fe0 Add more debug message for ManagedBuffer This is to help debug the error reported at http://apache-spark-user-list.1001560.n3.nabble.com/SQL-queries-fail-in-1-2-0-SNAPSHOT-td15327.html Author: Reynold Xin r...@apache.org Closes #2580 from rxin/buffer-debug and squashes the following commits: 5814292 [Reynold Xin] Logging close() in case close() fails. 323dfec [Reynold Xin] Add more debug message. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e43c72fe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e43c72fe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e43c72fe Branch: refs/heads/master Commit: e43c72fe04d4fbf2a108b456d533e641b71b0a2a Parents: dab1b0a Author: Reynold Xin r...@apache.org Authored: Mon Sep 29 12:38:24 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Mon Sep 29 12:38:24 2014 -0700 -- .../apache/spark/network/ManagedBuffer.scala| 43 +--- .../scala/org/apache/spark/util/Utils.scala | 14 +++ 2 files changed, 51 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e43c72fe/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala index e990c1d..a440918 100644 --- a/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala +++ b/core/src/main/scala/org/apache/spark/network/ManagedBuffer.scala @@ -17,15 +17,17 @@ package org.apache.spark.network -import java.io.{FileInputStream, RandomAccessFile, File, InputStream} +import java.io._ import java.nio.ByteBuffer import java.nio.channels.FileChannel import java.nio.channels.FileChannel.MapMode +import scala.util.Try + import com.google.common.io.ByteStreams import io.netty.buffer.{ByteBufInputStream, ByteBuf} -import org.apache.spark.util.ByteBufferInputStream +import org.apache.spark.util.{ByteBufferInputStream, Utils} /** @@ -71,18 +73,47 @@ final class FileSegmentManagedBuffer(val file: File, val offset: Long, val lengt try { channel = new RandomAccessFile(file, r).getChannel channel.map(MapMode.READ_ONLY, offset, length) +} catch { + case e: IOException = +Try(channel.size).toOption match { + case Some(fileLen) = +throw new IOException(sError in reading $this (actual file length $fileLen), e) + case None = +throw new IOException(sError in opening $this, e) +} } finally { if (channel != null) { -channel.close() +Utils.tryLog(channel.close()) } } } override def inputStream(): InputStream = { -val is = new FileInputStream(file) -is.skip(offset) -ByteStreams.limit(is, length) +var is: FileInputStream = null +try { + is = new FileInputStream(file) + is.skip(offset) + ByteStreams.limit(is, length) +} catch { + case e: IOException = +if (is != null) { + Utils.tryLog(is.close()) +} +Try(file.length).toOption match { + case Some(fileLen) = +throw new IOException(sError in reading $this (actual file length $fileLen), e) + case None = +throw new IOException(sError in opening $this, e) +} + case e: Throwable = +if (is != null) { + Utils.tryLog(is.close()) +} +throw e +} } + + override def toString: String = s${getClass.getName}($file, $offset, $length) } http://git-wip-us.apache.org/repos/asf/spark/blob/e43c72fe/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 2755887..10d4408 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1304,6 +1304,20 @@ private[spark] object Utils extends Logging { } } + /** Executes the given block in a Try, logging any uncaught exceptions. */ + def tryLog[T](f: = T): Try[T] = { +try { + val res = f + scala.util.Success(res) +} catch { + case ct: ControlThrowable = +throw ct + case t: Throwable = +logError(sUncaught exception in thread ${Thread.currentThread().getName}, t) +scala.util.Failure(t) +} + } + /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ def isFatalError(e: Throwable):
git commit: [Build] suppress curl/wget progress bars
Repository: spark Updated Branches: refs/heads/master ba5bcadde - 19f61c165 [Build] suppress curl/wget progress bars In the Jenkins console output, `curl` gives us mountains of `#` symbols as it tries to show its download progress. ![noise from curl in Jenkins output](http://i.imgur.com/P2E7yUw.png) I don't think this is useful so I've changed things to suppress these progress bars. If there is actually some use to this, feel free to reject this proposal. Author: Nicholas Chammas nicholas.cham...@gmail.com Closes #2279 from nchammas/trim-test-output and squashes the following commits: 14a720c [Nicholas Chammas] suppress curl/wget progress bars Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19f61c16 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19f61c16 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19f61c16 Branch: refs/heads/master Commit: 19f61c165932059e7ce156da2c71429fa8dc27f0 Parents: ba5bcad Author: Nicholas Chammas nicholas.cham...@gmail.com Authored: Fri Sep 5 21:46:45 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri Sep 5 21:46:45 2014 -0700 -- dev/check-license | 4 ++-- sbt/sbt-launch-lib.bash | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/19f61c16/dev/check-license -- diff --git a/dev/check-license b/dev/check-license index 625ec16..558e038 100755 --- a/dev/check-license +++ b/dev/check-license @@ -32,9 +32,9 @@ acquire_rat_jar () { printf Attempting to fetch rat\n JAR_DL=${JAR}.part if hash curl 2/dev/null; then - (curl --progress-bar ${URL1} $JAR_DL || curl --progress-bar ${URL2} $JAR_DL) mv $JAR_DL $JAR + (curl --silent ${URL1} $JAR_DL || curl --silent ${URL2} $JAR_DL) mv $JAR_DL $JAR elif hash wget 2/dev/null; then - (wget --progress=bar ${URL1} -O $JAR_DL || wget --progress=bar ${URL2} -O $JAR_DL) mv $JAR_DL $JAR + (wget --quiet ${URL1} -O $JAR_DL || wget --quiet ${URL2} -O $JAR_DL) mv $JAR_DL $JAR else printf You do not have curl or wget installed, please install rat manually.\n exit -1 http://git-wip-us.apache.org/repos/asf/spark/blob/19f61c16/sbt/sbt-launch-lib.bash -- diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash index c91fecf..fecc3d3 100755 --- a/sbt/sbt-launch-lib.bash +++ b/sbt/sbt-launch-lib.bash @@ -51,9 +51,9 @@ acquire_sbt_jar () { printf Attempting to fetch sbt\n JAR_DL=${JAR}.part if hash curl 2/dev/null; then - (curl --progress-bar ${URL1} ${JAR_DL} || curl --progress-bar ${URL2} ${JAR_DL}) mv ${JAR_DL} ${JAR} + (curl --silent ${URL1} ${JAR_DL} || curl --silent ${URL2} ${JAR_DL}) mv ${JAR_DL} ${JAR} elif hash wget 2/dev/null; then - (wget --progress=bar ${URL1} -O ${JAR_DL} || wget --progress=bar ${URL2} -O ${JAR_DL}) mv ${JAR_DL} ${JAR} + (wget --quiet ${URL1} -O ${JAR_DL} || wget --quiet ${URL2} -O ${JAR_DL}) mv ${JAR_DL} ${JAR} else printf You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n; exit -1 - To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org
git commit: [SPARK-2936] Migrate Netty network module from Java to Scala
Repository: spark Updated Branches: refs/heads/master b715aa0c8 - ba28a8fcb [SPARK-2936] Migrate Netty network module from Java to Scala The Netty network module was originally written when Scala 2.9.x had a bug that prevents a pure Scala implementation, and a subset of the files were done in Java. We have since upgraded to Scala 2.10, and can migrate all Java files now to Scala. https://github.com/netty/netty/issues/781 https://github.com/mesos/spark/pull/522 Author: Reynold Xin r...@apache.org Closes #1865 from rxin/netty and squashes the following commits: 332422f [Reynold Xin] Code review feedback ca9 [Reynold Xin] Minor update. 7f1434b [Reynold Xin] [SPARK-2936] Migrate Netty network module from Java to Scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ba28a8fc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ba28a8fc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ba28a8fc Branch: refs/heads/master Commit: ba28a8fcbc3ba432e7ea4d6f0b535450a6ec96c6 Parents: b715aa0 Author: Reynold Xin r...@apache.org Authored: Sun Aug 10 20:36:54 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Aug 10 20:36:54 2014 -0700 -- .../apache/spark/network/netty/FileClient.java | 100 - .../netty/FileClientChannelInitializer.java | 39 --- .../spark/network/netty/FileClientHandler.java | 55 - .../apache/spark/network/netty/FileServer.java | 111 --- .../netty/FileServerChannelInitializer.java | 41 --- .../spark/network/netty/FileServerHandler.java | 83 -- .../spark/network/netty/PathResolver.java | 26 - .../apache/spark/network/netty/FileClient.scala | 85 ++ .../netty/FileClientChannelInitializer.scala| 31 ++ .../spark/network/netty/FileClientHandler.scala | 50 + .../apache/spark/network/netty/FileHeader.scala | 5 +- .../apache/spark/network/netty/FileServer.scala | 91 +++ .../netty/FileServerChannelInitializer.scala| 34 ++ .../spark/network/netty/FileServerHandler.scala | 68 .../spark/network/netty/PathResolver.scala | 25 + .../spark/network/netty/ShuffleSender.scala | 2 +- 16 files changed, 387 insertions(+), 459 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ba28a8fc/core/src/main/java/org/apache/spark/network/netty/FileClient.java -- diff --git a/core/src/main/java/org/apache/spark/network/netty/FileClient.java b/core/src/main/java/org/apache/spark/network/netty/FileClient.java deleted file mode 100644 index 0d31894..000 --- a/core/src/main/java/org/apache/spark/network/netty/FileClient.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the License); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - *http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.network.netty; - -import java.util.concurrent.TimeUnit; - -import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.oio.OioEventLoopGroup; -import io.netty.channel.socket.oio.OioSocketChannel; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -class FileClient { - - private static final Logger LOG = LoggerFactory.getLogger(FileClient.class.getName()); - - private final FileClientHandler handler; - private Channel channel = null; - private Bootstrap bootstrap = null; - private EventLoopGroup group = null; - private final int connectTimeout; - private final int sendTimeout = 60; // 1 min - - FileClient(FileClientHandler handler, int connectTimeout) { -this.handler = handler; -this.connectTimeout = connectTimeout; - } - - public void init() { -group = new OioEventLoopGroup(); -bootstrap = new Bootstrap(); -bootstrap.group(group) - .channel(OioSocketChannel.class) - .option(ChannelOption.SO_KEEPALIVE, true) - .option(ChannelOption.TCP_NODELAY, true) -
git commit: [Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-n and local-n-failures consistent
Repository: spark Updated Branches: refs/heads/master f1957e116 - 284771efb [Spark 2557] fix LOCAL_N_REGEX in createTaskScheduler and make local-n and local-n-failures consistent [SPARK-2557](https://issues.apache.org/jira/browse/SPARK-2557) Author: Ye Xianjin advance...@gmail.com Closes #1464 from advancedxy/SPARK-2557 and squashes the following commits: d844d67 [Ye Xianjin] add local-*-n-failures, bad-local-n, bad-local-n-failures test case 3bbc668 [Ye Xianjin] fix LOCAL_N_REGEX regular expression and make local_n_failures accept * as all cores on the computer Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/284771ef Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/284771ef Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/284771ef Branch: refs/heads/master Commit: 284771efbef2d6b22212afd49dd62732a2cf52a8 Parents: f1957e1 Author: Ye Xianjin advance...@gmail.com Authored: Fri Aug 1 00:34:39 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri Aug 1 00:34:39 2014 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 10 ++--- .../SparkContextSchedulerCreationSuite.scala| 23 2 files changed, 30 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/284771ef/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f5a0549..0e51356 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1452,9 +1452,9 @@ object SparkContext extends Logging { /** Creates a task scheduler based on a given master URL. Extracted for testing. */ private def createTaskScheduler(sc: SparkContext, master: String): TaskScheduler = { // Regular expression used for local[N] and local[*] master formats -val LOCAL_N_REGEX = local\[([0-9\*]+)\].r +val LOCAL_N_REGEX = local\[([0-9]+|\*)\].r // Regular expression for local[N, maxRetries], used in tests with failing tasks -val LOCAL_N_FAILURES_REGEX = local\[([0-9]+)\s*,\s*([0-9]+)\].r +val LOCAL_N_FAILURES_REGEX = local\[([0-9]+|\*)\s*,\s*([0-9]+)\].r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally val LOCAL_CLUSTER_REGEX = local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*].r // Regular expression for connecting to Spark deploy clusters @@ -1484,8 +1484,12 @@ object SparkContext extends Logging { scheduler case LOCAL_N_FAILURES_REGEX(threads, maxFailures) = +def localCpuCount = Runtime.getRuntime.availableProcessors() +// local[*, M] means the number of cores on the computer with M failures +// local[N, M] means exactly N threads with M failures +val threadCount = if (threads == *) localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) -val backend = new LocalBackend(scheduler, threads.toInt) +val backend = new LocalBackend(scheduler, threadCount) scheduler.initialize(backend) scheduler http://git-wip-us.apache.org/repos/asf/spark/blob/284771ef/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala index 67e3be2..4b727e5 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala @@ -68,6 +68,15 @@ class SparkContextSchedulerCreationSuite } } + test(local-*-n-failures) { +val sched = createTaskScheduler(local[* ,2]) +assert(sched.maxTaskFailures === 2) +sched.backend match { + case s: LocalBackend = assert(s.totalCores === Runtime.getRuntime.availableProcessors()) + case _ = fail() +} + } + test(local-n-failures) { val sched = createTaskScheduler(local[4, 2]) assert(sched.maxTaskFailures === 2) @@ -77,6 +86,20 @@ class SparkContextSchedulerCreationSuite } } + test(bad-local-n) { +val e = intercept[SparkException] { + createTaskScheduler(local[2*]) +} +assert(e.getMessage.contains(Could not parse Master URL)) + } + + test(bad-local-n-failures) { +val e = intercept[SparkException] { + createTaskScheduler(local[2*,4]) +} +assert(e.getMessage.contains(Could not parse Master URL)) + }
git commit: [SPARK-2764] Simplify daemon.py process structure
Repository: spark Updated Branches: refs/heads/master a38d3c9ef - e8e0fd691 [SPARK-2764] Simplify daemon.py process structure Curently, daemon.py forks a pool of numProcessors subprocesses, and those processes fork themselves again to create the actual Python worker processes that handle data. I think that this extra layer of indirection is unnecessary and adds a lot of complexity. This commit attempts to remove this middle layer of subprocesses by launching the workers directly from daemon.py. See https://github.com/mesos/spark/pull/563 for the original PR that added daemon.py, where I raise some issues with the current design. Author: Josh Rosen joshro...@apache.org Closes #1680 from JoshRosen/pyspark-daemon and squashes the following commits: 5abbcb9 [Josh Rosen] Replace magic number: 4 - EINTR 5495dff [Josh Rosen] Throw IllegalStateException if worker launch fails. b79254d [Josh Rosen] Detect failed fork() calls; improve error logging. 282c2c4 [Josh Rosen] Remove daemon.py exit logging, since it caused problems: 8554536 [Josh Rosen] Fix daemonâs shutdown(); log shutdown reason. 4e0fab8 [Josh Rosen] Remove shared-memory exit_flag; don't die on worker death. e9892b4 [Josh Rosen] [WIP] [SPARK-2764] Simplify daemon.py process structure. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e8e0fd69 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e8e0fd69 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e8e0fd69 Branch: refs/heads/master Commit: e8e0fd691a06a2887fdcffb2217b96805ace0cb0 Parents: a38d3c9 Author: Josh Rosen joshro...@apache.org Authored: Fri Aug 1 19:38:21 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri Aug 1 19:38:21 2014 -0700 -- .../spark/api/python/PythonWorkerFactory.scala | 10 +- python/pyspark/daemon.py| 179 --- 2 files changed, 79 insertions(+), 110 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/e8e0fd69/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 759cbe2..15fe8a9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -64,10 +64,16 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Attempt to connect, restart and retry once if it fails try { -new Socket(daemonHost, daemonPort) +val socket = new Socket(daemonHost, daemonPort) +val launchStatus = new DataInputStream(socket.getInputStream).readInt() +if (launchStatus != 0) { + throw new IllegalStateException(Python daemon failed to launch worker) +} +socket } catch { case exc: SocketException = - logWarning(Python daemon unexpectedly quit, attempting to restart) + logWarning(Failed to open socket to Python daemon:, exc) + logWarning(Assuming that daemon unexpectedly quit, attempting to restart) stopDaemon() startDaemon() new Socket(daemonHost, daemonPort) http://git-wip-us.apache.org/repos/asf/spark/blob/e8e0fd69/python/pyspark/daemon.py -- diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index 8a5873d..9fde0dd 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -15,64 +15,39 @@ # limitations under the License. # +import numbers import os import signal +import select import socket import sys import traceback -import multiprocessing -from ctypes import c_bool from errno import EINTR, ECHILD from socket import AF_INET, SOCK_STREAM, SOMAXCONN from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN from pyspark.worker import main as worker_main from pyspark.serializers import write_int -try: -POOLSIZE = multiprocessing.cpu_count() -except NotImplementedError: -POOLSIZE = 4 - -exit_flag = multiprocessing.Value(c_bool, False) - - -def should_exit(): -global exit_flag -return exit_flag.value - def compute_real_exit_code(exit_code): # SystemExit's code can be integer or string, but os._exit only accepts integers -import numbers if isinstance(exit_code, numbers.Integral): return exit_code else: return 1 -def worker(listen_sock): +def worker(sock): + +Called by a worker process after the fork(). + # Redirect stdout to stderr os.dup2(2, 1) sys.stdout
git commit: SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant
Repository: spark Updated Branches: refs/heads/master 1b10b8114 - 9564f8548 SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant Author: Sandy Ryza sa...@cloudera.com Closes #1474 from sryza/sandy-spark-2564 and squashes the following commits: 35b8388 [Sandy Ryza] Fix compile error on upmerge 7b985fb [Sandy Ryza] Fix test compile error 43f79e6 [Sandy Ryza] SPARK-2564. ShuffleReadMetrics.totalBlocksRead is redundant Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9564f854 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9564f854 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9564f854 Branch: refs/heads/master Commit: 9564f8548917f563930d5e87911a304bf206d26e Parents: 1b10b81 Author: Sandy Ryza sa...@cloudera.com Authored: Sun Jul 20 14:45:34 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Jul 20 14:45:34 2014 -0700 -- core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala | 3 +-- .../org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala | 1 - .../scala/org/apache/spark/storage/BlockFetcherIterator.scala| 4 +--- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 -- .../src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 3 --- 5 files changed, 2 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 5d59e00..21fe643 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -99,7 +99,6 @@ class TaskMetrics extends Serializable { existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched -existingMetrics.totalBlocksFetched += newMetrics.totalBlocksFetched existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead case None = _shuffleReadMetrics = Some(newMetrics) @@ -149,7 +148,7 @@ class ShuffleReadMetrics extends Serializable { /** * Number of blocks fetched in this shuffle by this task (remote or local) */ - var totalBlocksFetched: Int = _ + def totalBlocksFetched: Int = remoteBlocksFetched + localBlocksFetched /** * Number of remote blocks fetched in this shuffle by this task http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala -- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 3795994..9978882 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -81,7 +81,6 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { shuffleMetrics.shuffleFinishTime = System.currentTimeMillis shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead - shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics) http://git-wip-us.apache.org/repos/asf/spark/blob/9564f854/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 2f0296c..69905a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -46,7 +46,6 @@ import org.apache.spark.util.Utils private[storage] trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { def initialize() - def totalBlocks: Int def numLocalBlocks: Int def numRemoteBlocks: Int def fetchWaitTime: Long @@ -192,7 +191,7 @@ object BlockFetcherIterator { } } logInfo(Getting +
git commit: [SPARK-2485][SQL] Lock usage of hive client.
Repository: spark Updated Branches: refs/heads/branch-1.0 0e2727959 - 53a6399e5 [SPARK-2485][SQL] Lock usage of hive client. Author: Michael Armbrust mich...@databricks.com Closes #1412 from marmbrus/lockHiveClient and squashes the following commits: 4bc9d5a [Michael Armbrust] protected[hive] 22e9177 [Michael Armbrust] Add comments. 7aa8554 [Michael Armbrust] Don't lock on hive's object. a6edc5f [Michael Armbrust] Lock usage of hive client. (cherry picked from commit c7c7ac83392b10abb011e6aead1bf92e7c73695e) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53a6399e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53a6399e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53a6399e Branch: refs/heads/branch-1.0 Commit: 53a6399e54b1843528e1a0b45de5996694747957 Parents: 0e27279 Author: Michael Armbrust mich...@databricks.com Authored: Tue Jul 15 00:13:51 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Jul 15 00:14:07 2014 -0700 -- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/53a6399e/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 28ccd6d..c64cad3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -44,14 +44,15 @@ import scala.collection.JavaConversions._ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { import HiveMetastoreTypes._ - val client = Hive.get(hive.hiveconf) + /** Connection to hive metastore. Usages should lock on `this`. */ + protected[hive] val client = Hive.get(hive.hiveconf) val caseSensitive: Boolean = false def lookupRelation( db: Option[String], tableName: String, - alias: Option[String]): LogicalPlan = { + alias: Option[String]): LogicalPlan = synchronized { val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) val table = client.getTable(databaseName, tblName)
git commit: [SPARK-2485][SQL] Lock usage of hive client.
Repository: spark Updated Branches: refs/heads/master c6d75745d - c7c7ac833 [SPARK-2485][SQL] Lock usage of hive client. Author: Michael Armbrust mich...@databricks.com Closes #1412 from marmbrus/lockHiveClient and squashes the following commits: 4bc9d5a [Michael Armbrust] protected[hive] 22e9177 [Michael Armbrust] Add comments. 7aa8554 [Michael Armbrust] Don't lock on hive's object. a6edc5f [Michael Armbrust] Lock usage of hive client. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c7c7ac83 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c7c7ac83 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c7c7ac83 Branch: refs/heads/master Commit: c7c7ac83392b10abb011e6aead1bf92e7c73695e Parents: c6d7574 Author: Michael Armbrust mich...@databricks.com Authored: Tue Jul 15 00:13:51 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Jul 15 00:13:51 2014 -0700 -- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c7c7ac83/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala -- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f830688..8db60d3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -43,14 +43,15 @@ import scala.collection.JavaConversions._ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { import HiveMetastoreTypes._ - val client = Hive.get(hive.hiveconf) + /** Connection to hive metastore. Usages should lock on `this`. */ + protected[hive] val client = Hive.get(hive.hiveconf) val caseSensitive: Boolean = false def lookupRelation( db: Option[String], tableName: String, - alias: Option[String]): LogicalPlan = { + alias: Option[String]): LogicalPlan = synchronized { val (dbName, tblName) = processDatabaseAndTableName(db, tableName) val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase) val table = client.getTable(databaseName, tblName)
git commit: Reformat multi-line closure argument.
Repository: spark Updated Branches: refs/heads/master 04b01bb10 - cb09e93c1 Reformat multi-line closure argument. Author: William Benton wi...@redhat.com Closes #1419 from willb/reformat-2486 and squashes the following commits: 2676231 [William Benton] Reformat multi-line closure argument. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb09e93c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb09e93c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb09e93c Branch: refs/heads/master Commit: cb09e93c1d7ef9c8f0a1abe4e659783c74993a4e Parents: 04b01bb Author: William Benton wi...@redhat.com Authored: Tue Jul 15 09:13:39 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Jul 15 09:13:39 2014 -0700 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/cb09e93c/core/src/main/scala/org/apache/spark/util/Utils.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index d72c97b..10c33d6 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -809,11 +809,12 @@ private[spark] object Utils extends Logging { */ def getCallSite: CallSite = { val trace = Thread.currentThread.getStackTrace() - .filterNot((ste:StackTraceElement) = + .filterNot { ste:StackTraceElement = // When running under some profilers, the current stack trace might contain some bogus // frames. This is intended to ensure that we don't crash in these situations by // ignoring any frames that we can't examine. -(ste == null || ste.getMethodName == null || ste.getMethodName.contains(getStackTrace))) +(ste == null || ste.getMethodName == null || ste.getMethodName.contains(getStackTrace)) + } // Keep crawling up the stack trace until we find the first function not inside of the spark // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
git commit: [SPARK-2403] Catch all errors during serialization in DAGScheduler
Repository: spark Updated Branches: refs/heads/branch-1.0 4bf8ddaee - 3bd32f023 [SPARK-2403] Catch all errors during serialization in DAGScheduler https://issues.apache.org/jira/browse/SPARK-2403 Spark hangs for us whenever we forget to register a class with Kryo. This should be a simple fix for that. But let me know if you have a better suggestion. I did not write a new test for this. It would be pretty complicated and I'm not sure it's worthwhile for such a simple change. Let me know if you disagree. Author: Daniel Darabos darabos.dan...@gmail.com Closes #1329 from darabos/spark-2403 and squashes the following commits: 3aceaad [Daniel Darabos] Print full stack trace for miscellaneous exceptions during serialization. 52c22ba [Daniel Darabos] Only catch NonFatal exceptions. 361e962 [Daniel Darabos] Catch all errors during serialization in DAGScheduler. (cherry picked from commit c8a2313cdf825e0191680a423d17619b5504ff89) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bd32f02 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bd32f02 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bd32f02 Branch: refs/heads/branch-1.0 Commit: 3bd32f023d9bd83da7afab37fffe614064df3e6b Parents: 4bf8dda Author: Daniel Darabos darabos.dan...@gmail.com Authored: Tue Jul 8 10:43:46 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Jul 8 10:44:02 2014 -0700 -- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala| 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3bd32f02/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index d15aaa3..a9fd7e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -26,6 +26,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag +import scala.util.control.NonFatal import akka.actor._ import akka.actor.OneForOneStrategy @@ -771,6 +772,10 @@ class DAGScheduler( abortStage(stage, Task not serializable: + e.toString) runningStages -= stage return +case NonFatal(e) = // Other exceptions, such as IllegalArgumentException from Kryo. + abortStage(stage, sTask serialization failed: $e\n${e.getStackTraceString}) + runningStages -= stage + return } logInfo(Submitting + tasks.size + missing tasks from + stage + ( + stage.rdd + ))
git commit: [SPARK-2403] Catch all errors during serialization in DAGScheduler
Repository: spark Updated Branches: refs/heads/master cc3e0a14d - c8a2313cd [SPARK-2403] Catch all errors during serialization in DAGScheduler https://issues.apache.org/jira/browse/SPARK-2403 Spark hangs for us whenever we forget to register a class with Kryo. This should be a simple fix for that. But let me know if you have a better suggestion. I did not write a new test for this. It would be pretty complicated and I'm not sure it's worthwhile for such a simple change. Let me know if you disagree. Author: Daniel Darabos darabos.dan...@gmail.com Closes #1329 from darabos/spark-2403 and squashes the following commits: 3aceaad [Daniel Darabos] Print full stack trace for miscellaneous exceptions during serialization. 52c22ba [Daniel Darabos] Only catch NonFatal exceptions. 361e962 [Daniel Darabos] Catch all errors during serialization in DAGScheduler. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8a2313c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8a2313c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8a2313c Branch: refs/heads/master Commit: c8a2313cdf825e0191680a423d17619b5504ff89 Parents: cc3e0a1 Author: Daniel Darabos darabos.dan...@gmail.com Authored: Tue Jul 8 10:43:46 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue Jul 8 10:43:46 2014 -0700 -- .../main/scala/org/apache/spark/scheduler/DAGScheduler.scala| 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c8a2313c/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 81c136d..f72bfde 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -26,6 +26,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.language.postfixOps import scala.reflect.ClassTag +import scala.util.control.NonFatal import akka.actor._ import akka.actor.OneForOneStrategy @@ -768,6 +769,10 @@ class DAGScheduler( abortStage(stage, Task not serializable: + e.toString) runningStages -= stage return +case NonFatal(e) = // Other exceptions, such as IllegalArgumentException from Kryo. + abortStage(stage, sTask serialization failed: $e\n${e.getStackTraceString}) + runningStages -= stage + return } logInfo(Submitting + tasks.size + missing tasks from + stage + ( + stage.rdd + ))
git commit: [SPARK-2324] SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error
Repository: spark Updated Branches: refs/heads/master bc7041a42 - 3bbeca648 [SPARK-2324] SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error The spark.local.dir is configured as a list of multiple paths as follows /data1/sparkenv/local,/data2/sparkenv/local. If the disk data2 of the driver node has error, the application will exit since DiskBlockManager exits directly at createLocalDirs. If the disk data2 of the worker node has error, the executor will exit either. DiskBlockManager should not exit directly at createLocalDirs if one of spark.local.dir has error. Since spark.local.dir has multiple paths, a problem should not affect the overall situation. I think DiskBlockManager could ignore the bad directory at createLocalDirs. Author: yantangzhai tyz0...@163.com Closes #1274 from YanTangZhai/SPARK-2324 and squashes the following commits: 609bf48 [yantangzhai] [SPARK-2324] SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error df08673 [yantangzhai] [SPARK-2324] SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3bbeca64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3bbeca64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3bbeca64 Branch: refs/heads/master Commit: 3bbeca648985b32bdf1eedef779cb2817eb6dfa4 Parents: bc7041a Author: yantangzhai tyz0...@163.com Authored: Thu Jul 3 10:14:35 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Jul 3 10:14:35 2014 -0700 -- .../org/apache/spark/storage/DiskBlockManager.scala | 16 +++- 1 file changed, 11 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3bbeca64/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 2ec46d4..673fc19 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -44,6 +44,10 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */ private val localDirs: Array[File] = createLocalDirs() + if (localDirs.isEmpty) { +logError(Failed to create any local dir.) +System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) + } private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) private var shuffleSender : ShuffleSender = null @@ -116,7 +120,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def createLocalDirs(): Array[File] = { logDebug(sCreating local directories at root dirs '$rootDirs') val dateFormat = new SimpleDateFormat(MMddHHmmss) -rootDirs.split(,).map { rootDir = +rootDirs.split(,).flatMap { rootDir = var foundLocalDir = false var localDir: File = null var localDirId: String = null @@ -136,11 +140,13 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD } } if (!foundLocalDir) { -logError(sFailed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir) -System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) +logError(sFailed $MAX_DIR_CREATION_ATTEMPTS attempts to create local dir in $rootDir. + + Ignoring this directory.) +None + } else { +logInfo(sCreated local directory at $localDir) +Some(localDir) } - logInfo(sCreated local directory at $localDir) - localDir } }
git commit: [SPARK] Fix NPE for ExternalAppendOnlyMap
Repository: spark Updated Branches: refs/heads/master 3bbeca648 - c48053773 [SPARK] Fix NPE for ExternalAppendOnlyMap It did not handle null keys very gracefully before. Author: Andrew Or andrewo...@gmail.com Closes #1288 from andrewor14/fix-external and squashes the following commits: 312b8d8 [Andrew Or] Abstract key hash code ed5adf9 [Andrew Or] Fix NPE for ExternalAppendOnlyMap Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4805377 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4805377 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4805377 Branch: refs/heads/master Commit: c480537739f9329ebfd580f09c69778e6c976366 Parents: 3bbeca6 Author: Andrew Or andrewo...@gmail.com Authored: Thu Jul 3 10:26:50 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Jul 3 10:26:50 2014 -0700 -- .../util/collection/ExternalAppendOnlyMap.scala | 30 ++-- .../collection/ExternalAppendOnlyMapSuite.scala | 27 -- 2 files changed, 46 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c4805377/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 288badd..292d096 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (it.hasNext) { var kc = it.next() kcPairs += kc -val minHash = kc._1.hashCode() +val minHash = getKeyHashCode(kc) while (it.hasNext it.head._1.hashCode() == minHash) { kc = it.next() kcPairs += kc @@ -294,8 +294,9 @@ class ExternalAppendOnlyMap[K, V, C]( // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) - var (minKey, minCombiner) = minPairs.remove(0) - assert(minKey.hashCode() == minHash) + val minPair = minPairs.remove(0) + var (minKey, minCombiner) = minPair + assert(getKeyHashCode(minPair) == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash), // merge in the corresponding value (if any) from that stream @@ -327,15 +328,16 @@ class ExternalAppendOnlyMap[K, V, C]( * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. */ private class StreamBuffer( -val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)]) +val iterator: BufferedIterator[(K, C)], +val pairs: ArrayBuffer[(K, C)]) extends Comparable[StreamBuffer] { def isEmpty = pairs.length == 0 // Invalid if there are no more pairs in this stream - def minKeyHash = { + def minKeyHash: Int = { assert(pairs.length 0) -pairs.head._1.hashCode() +getKeyHashCode(pairs.head) } override def compareTo(other: StreamBuffer): Int = { @@ -422,10 +424,22 @@ class ExternalAppendOnlyMap[K, V, C]( } private[spark] object ExternalAppendOnlyMap { + + /** + * Return the key hash code of the given (key, combiner) pair. + * If the key is null, return a special hash code. + */ + private def getKeyHashCode[K, C](kc: (K, C)): Int = { +if (kc._1 == null) 0 else kc._1.hashCode() + } + + /** + * A comparator for (key, combiner) pairs based on their key hash codes. + */ private class KCComparator[K, C] extends Comparator[(K, C)] { def compare(kc1: (K, C), kc2: (K, C)): Int = { - val hash1 = kc1._1.hashCode() - val hash2 = kc2._1.hashCode() + val hash1 = getKeyHashCode(kc1) + val hash2 = getKeyHashCode(kc2) if (hash1 hash2) -1 else if (hash1 == hash2) 0 else 1 } } http://git-wip-us.apache.org/repos/asf/spark/blob/c4805377/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index deb7809..4288229 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@
git commit: [SPARK] Fix NPE for ExternalAppendOnlyMap
Repository: spark Updated Branches: refs/heads/branch-1.0 87b74a9bf - fdee6ee06 [SPARK] Fix NPE for ExternalAppendOnlyMap It did not handle null keys very gracefully before. Author: Andrew Or andrewo...@gmail.com Closes #1288 from andrewor14/fix-external and squashes the following commits: 312b8d8 [Andrew Or] Abstract key hash code ed5adf9 [Andrew Or] Fix NPE for ExternalAppendOnlyMap (cherry picked from commit c480537739f9329ebfd580f09c69778e6c976366) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdee6ee0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdee6ee0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdee6ee0 Branch: refs/heads/branch-1.0 Commit: fdee6ee0655f04e9a0d3a66f2e8df5486a5ea032 Parents: 87b74a9 Author: Andrew Or andrewo...@gmail.com Authored: Thu Jul 3 10:26:50 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Jul 3 10:28:06 2014 -0700 -- .../util/collection/ExternalAppendOnlyMap.scala | 30 ++-- .../collection/ExternalAppendOnlyMapSuite.scala | 27 -- 2 files changed, 46 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/fdee6ee0/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 288badd..292d096 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -252,7 +252,7 @@ class ExternalAppendOnlyMap[K, V, C]( if (it.hasNext) { var kc = it.next() kcPairs += kc -val minHash = kc._1.hashCode() +val minHash = getKeyHashCode(kc) while (it.hasNext it.head._1.hashCode() == minHash) { kc = it.next() kcPairs += kc @@ -294,8 +294,9 @@ class ExternalAppendOnlyMap[K, V, C]( // Select a key from the StreamBuffer that holds the lowest key hash val minBuffer = mergeHeap.dequeue() val (minPairs, minHash) = (minBuffer.pairs, minBuffer.minKeyHash) - var (minKey, minCombiner) = minPairs.remove(0) - assert(minKey.hashCode() == minHash) + val minPair = minPairs.remove(0) + var (minKey, minCombiner) = minPair + assert(getKeyHashCode(minPair) == minHash) // For all other streams that may have this key (i.e. have the same minimum key hash), // merge in the corresponding value (if any) from that stream @@ -327,15 +328,16 @@ class ExternalAppendOnlyMap[K, V, C]( * StreamBuffers are ordered by the minimum key hash found across all of their own pairs. */ private class StreamBuffer( -val iterator: BufferedIterator[(K, C)], val pairs: ArrayBuffer[(K, C)]) +val iterator: BufferedIterator[(K, C)], +val pairs: ArrayBuffer[(K, C)]) extends Comparable[StreamBuffer] { def isEmpty = pairs.length == 0 // Invalid if there are no more pairs in this stream - def minKeyHash = { + def minKeyHash: Int = { assert(pairs.length 0) -pairs.head._1.hashCode() +getKeyHashCode(pairs.head) } override def compareTo(other: StreamBuffer): Int = { @@ -422,10 +424,22 @@ class ExternalAppendOnlyMap[K, V, C]( } private[spark] object ExternalAppendOnlyMap { + + /** + * Return the key hash code of the given (key, combiner) pair. + * If the key is null, return a special hash code. + */ + private def getKeyHashCode[K, C](kc: (K, C)): Int = { +if (kc._1 == null) 0 else kc._1.hashCode() + } + + /** + * A comparator for (key, combiner) pairs based on their key hash codes. + */ private class KCComparator[K, C] extends Comparator[(K, C)] { def compare(kc1: (K, C), kc2: (K, C)): Int = { - val hash1 = kc1._1.hashCode() - val hash2 = kc2._1.hashCode() + val hash1 = getKeyHashCode(kc1) + val hash2 = getKeyHashCode(kc2) if (hash1 hash2) -1 else if (hash1 == hash2) 0 else 1 } } http://git-wip-us.apache.org/repos/asf/spark/blob/fdee6ee0/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index deb7809..4288229 100644 ---
git commit: [SPARK-1097] Workaround Hadoop conf ConcurrentModification issue
Repository: spark Updated Branches: refs/heads/master fdc4c112e - 5fa0a0576 [SPARK-1097] Workaround Hadoop conf ConcurrentModification issue Workaround Hadoop conf ConcurrentModification issue Author: Raymond Liu raymond@intel.com Closes #1273 from colorant/hadoopRDD and squashes the following commits: 994e98b [Raymond Liu] Address comments e2cda3d [Raymond Liu] Workaround Hadoop conf ConcurrentModification issue Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5fa0a057 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5fa0a057 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5fa0a057 Branch: refs/heads/master Commit: 5fa0a05763ab1d527efe20e3b10539ac5ffc36de Parents: fdc4c11 Author: Raymond Liu raymond@intel.com Authored: Thu Jul 3 19:24:22 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu Jul 3 19:24:22 2014 -0700 -- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/5fa0a057/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 98dcbf4..0410285 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -141,8 +141,8 @@ class HadoopRDD[K, V]( // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) - broadcastedConf.synchronized { -val newJobConf = new JobConf(broadcastedConf.value.value) + conf.synchronized { +val newJobConf = new JobConf(conf) initLocalJobConfFuncOpt.map(f = f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) newJobConf
git commit: [SPARK-1394] Remove SIGCHLD handler in worker subprocess
Repository: spark Updated Branches: refs/heads/master b8f2e13ae - 3c104c79d [SPARK-1394] Remove SIGCHLD handler in worker subprocess It should not be the responsibility of the worker subprocess, which does not intentionally fork, to try and cleanup child processes. Doing so is complex and interferes with operations such as platform.system(). If it is desirable to have tighter control over subprocesses, then namespaces should be used and it should be the manager's resposibility to handle cleanup. Author: Matthew Farrellee m...@redhat.com Closes #1247 from mattf/SPARK-1394 and squashes the following commits: c36f308 [Matthew Farrellee] [SPARK-1394] Remove SIGCHLD handler in worker subprocess Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3c104c79 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3c104c79 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3c104c79 Branch: refs/heads/master Commit: 3c104c79d24425786cec0034f269ba19cf465b31 Parents: b8f2e13 Author: Matthew Farrellee m...@redhat.com Authored: Sat Jun 28 18:39:27 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sat Jun 28 18:39:27 2014 -0700 -- python/pyspark/daemon.py | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3c104c79/python/pyspark/daemon.py -- diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index b2f226a..5eb1c63 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -103,6 +103,7 @@ def worker(listen_sock): if os.fork() == 0: # Leave the worker pool signal.signal(SIGHUP, SIG_DFL) +signal.signal(SIGCHLD, SIG_DFL) listen_sock.close() # Read the socket using fdopen instead of socket.makefile() because the latter # seems to be very slow; note that we need to dup() the file descriptor because
git commit: [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts.
Repository: spark Updated Branches: refs/heads/branch-1.0 64316af5a - 67bffd3c7 [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts. SPARK-1112: This is a more conservative version of #1132 that doesn't change around the actor system initialization on the executor. Instead we just directly read the current frame size limit from the ActorSystem. SPARK-2156: This uses the same fixe as in #1132. Author: Patrick Wendell pwend...@gmail.com Closes #1172 from pwendell/akka-10-fix and squashes the following commits: d56297e [Patrick Wendell] Set limit in LocalBackend to preserve test expectations 9f5ed19 [Patrick Wendell] [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67bffd3c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67bffd3c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67bffd3c Branch: refs/heads/branch-1.0 Commit: 67bffd3c7ee8e9e3395e714e470459f09d19e66d Parents: 64316af Author: Patrick Wendell pwend...@gmail.com Authored: Sun Jun 22 19:31:15 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Jun 22 19:31:15 2014 -0700 -- .../executor/CoarseGrainedExecutorBackend.scala | 8 ++-- .../org/apache/spark/executor/Executor.scala| 8 +++- .../apache/spark/executor/ExecutorBackend.scala | 3 +++ .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/local/LocalBackend.scala| 6 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 3 +++ .../apache/spark/MapOutputTrackerSuite.scala| 21 +++- 7 files changed, 33 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2279d77..70c1f4c 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, -cores: Int) +cores: Int, +actorSystem: ActorSystem) extends Actor with ExecutorBackend with Logging { @@ -94,6 +95,9 @@ private[spark] class CoarseGrainedExecutorBackend( override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { driver ! StatusUpdate(executorId, taskId, state, data) } + + override def akkaFrameSize() = actorSystem.settings.config.getBytes( +akka.remote.netty.tcp.maximum-frame-size) } private[spark] object CoarseGrainedExecutorBackend { @@ -113,7 +117,7 @@ private[spark] object CoarseGrainedExecutorBackend { val sparkHostPort = hostname + : + boundPort actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, -sparkHostPort, cores), +sparkHostPort, cores, actorSystem), name = Executor) workerUrl.foreach { url = http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a2..214a8c8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -97,10 +97,6 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool(Executor task launch worker) @@ -211,8 +207,10 @@ private[spark] class Executor( task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo(Serialized size of result for + taskId + is + serializedDirectResult.limit) + val serializedResult = { - if (serializedDirectResult.limit = akkaFrameSize - 1024) { + if (serializedDirectResult.limit = execBackend.akkaFrameSize() - +
git commit: [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors
Repository: spark Updated Branches: refs/heads/master 269fc62b2 - ca5d9d43b [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors There seems to be 2 issues. 1. When job is done, driver asks executor to shutdown. However, this clean exit was assigned FAILED executor state by Worker. I introduced EXITED executor state for executors who voluntarily exit (both normal and abnormal exit depending on the exit code). 2. When Master gets notified an executor has exited, it launches another one to replace it, regardless of reason why the executor had exited. When the reason was job has finished, the unnecessary replacement got subsequently killed when App disassociates. This launching and killing of unnecessary executors shows up in the log and is confusing to users. I added check for executor exit status and avoid launching (and subsequent killing) of unnecessary replacements when executors exit cleanly. One could ask the scheduler to tell Master job is done so that Master wouldn't launch the replacement executor. However, there is a race condition between App telling Master job is done and Worker telling Master an executor had exited. There is no guarantee the former will happen before the later. Instead, I chose to check the exit code when executor exits. If the exit code is 0, I assume executor has been asked to shutdown by driver and Master will not launch replacements. Due to race condition, it could also happen that (although didn't happen on my local cluster), Master detects App disassociation event before the executor exits by itself. In such cases, the executor will be rightfully killed and labeled as KILLED, while the App state will show FINISHED. Author: Kan Zhang kzh...@apache.org Closes #306 from kanzhang/SPARK-1118 and squashes the following commits: cb0cc86 [Kan Zhang] [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca5d9d43 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca5d9d43 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca5d9d43 Branch: refs/heads/master Commit: ca5d9d43b93abd279079b3be8a06fdd78c595510 Parents: 269fc62 Author: Kan Zhang kzh...@apache.org Authored: Sun Jun 15 14:55:34 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Jun 15 14:55:34 2014 -0700 -- .../main/scala/org/apache/spark/deploy/ExecutorState.scala| 4 ++-- .../main/scala/org/apache/spark/deploy/master/Master.scala| 5 +++-- .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 7 +++ 3 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/ca5d9d43/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index 37dfa7f..9f34d01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -19,9 +19,9 @@ package org.apache.spark.deploy private[spark] object ExecutorState extends Enumeration { - val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value + val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value type ExecutorState = Value - def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state) + def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state) } http://git-wip-us.apache.org/repos/asf/spark/blob/ca5d9d43/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c6dec30..33ffcbd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -303,10 +303,11 @@ private[spark] class Master( appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) +val normalExit = exitStatus.exists(_ == 0) // Only retry certain number of times so we don't go into an infinite loop. -if (appInfo.incrementRetryCount ApplicationState.MAX_NUM_RETRY) { +if (!normalExit appInfo.incrementRetryCount ApplicationState.MAX_NUM_RETRY) { schedule() -} else { +} else if (!normalExit) { logError(Application %s with ID %s failed
git commit: [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors
Repository: spark Updated Branches: refs/heads/branch-1.0 868cf421e - 609e5ff20 [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors There seems to be 2 issues. 1. When job is done, driver asks executor to shutdown. However, this clean exit was assigned FAILED executor state by Worker. I introduced EXITED executor state for executors who voluntarily exit (both normal and abnormal exit depending on the exit code). 2. When Master gets notified an executor has exited, it launches another one to replace it, regardless of reason why the executor had exited. When the reason was job has finished, the unnecessary replacement got subsequently killed when App disassociates. This launching and killing of unnecessary executors shows up in the log and is confusing to users. I added check for executor exit status and avoid launching (and subsequent killing) of unnecessary replacements when executors exit cleanly. One could ask the scheduler to tell Master job is done so that Master wouldn't launch the replacement executor. However, there is a race condition between App telling Master job is done and Worker telling Master an executor had exited. There is no guarantee the former will happen before the later. Instead, I chose to check the exit code when executor exits. If the exit code is 0, I assume executor has been asked to shutdown by driver and Master will not launch replacements. Due to race condition, it could also happen that (although didn't happen on my local cluster), Master detects App disassociation event before the executor exits by itself. In such cases, the executor will be rightfully killed and labeled as KILLED, while the App state will show FINISHED. Author: Kan Zhang kzh...@apache.org Closes #306 from kanzhang/SPARK-1118 and squashes the following commits: cb0cc86 [Kan Zhang] [SPARK-937] adding EXITED executor state and not relaunching cleanly exited executors (cherry picked from commit ca5d9d43b93abd279079b3be8a06fdd78c595510) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/609e5ff2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/609e5ff2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/609e5ff2 Branch: refs/heads/branch-1.0 Commit: 609e5ff20dc5f9eefbe1e6de8d21096de78ff8bd Parents: 868cf42 Author: Kan Zhang kzh...@apache.org Authored: Sun Jun 15 14:55:34 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Jun 15 14:55:53 2014 -0700 -- .../main/scala/org/apache/spark/deploy/ExecutorState.scala| 4 ++-- .../main/scala/org/apache/spark/deploy/master/Master.scala| 5 +++-- .../scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 7 +++ 3 files changed, 8 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/609e5ff2/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala index 37dfa7f..9f34d01 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala @@ -19,9 +19,9 @@ package org.apache.spark.deploy private[spark] object ExecutorState extends Enumeration { - val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value + val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value type ExecutorState = Value - def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state) + def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state) } http://git-wip-us.apache.org/repos/asf/spark/blob/609e5ff2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c6dec30..33ffcbd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -303,10 +303,11 @@ private[spark] class Master( appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) +val normalExit = exitStatus.exists(_ == 0) // Only retry certain number of times so we don't go into an infinite loop. -if (appInfo.incrementRetryCount ApplicationState.MAX_NUM_RETRY) { +if (!normalExit appInfo.incrementRetryCount ApplicationState.MAX_NUM_RETRY) {
svn commit: r1600800 [2/2] - in /spark: ./ site/ site/news/ site/releases/
Modified: spark/site/releases/spark-release-0-8-1.html URL: http://svn.apache.org/viewvc/spark/site/releases/spark-release-0-8-1.html?rev=1600800r1=1600799r2=1600800view=diff == --- spark/site/releases/spark-release-0-8-1.html (original) +++ spark/site/releases/spark-release-0-8-1.html Fri Jun 6 00:55:58 2014 @@ -163,7 +163,7 @@ pApache Spark 0.8.1 is a maintenance and performance release for the Scala 2.9 version of Spark. It also adds several new features, such as standalone mode high availability, that will appear in Spark 0.9 but developers wanted to have in Scala 2.9. Contributions to 0.8.1 came from 41 developers./p h3 id=yarn-22-supportYARN 2.2 Support/h3 -pSupport has been added for running Spark on YARN 2.2 and newer. Due to a change in the YARN API between previous versions and 2.2+, this was not supported in Spark 0.8.0. See the a href=/docs/0.8.1/running-on-yarn.htmlYARN documentation/a for specific instructions on how to build Spark for YARN 2.2+. Weâve also included a pre-compiled binary for YARN 2.2./p +pSupport has been added for running Spark on YARN 2.2 and newer. Due to a change in the YARN API between previous versions and 2.2+, this was not supported in Spark 0.8.0. See the a href=/docs/0.8.1/running-on-yarn.htmlYARN documentation/a for specific instructions on how to build Spark for YARN 2.2+. We#8217;ve also included a pre-compiled binary for YARN 2.2./p h3 id=high-availability-mode-for-standalone-cluster-managerHigh Availability Mode for Standalone Cluster Manager/h3 pThe standalone cluster manager now has a high availability (H/A) mode which can tolerate master failures. This is particularly useful for long-running applications such as streaming jobs and the shark server, where the scheduler master previously represented a single point of failure. Instructions for deploying H/A mode are included a href=/docs/0.8.1/spark-standalone.html#high-availabilityin the documentation/a. The current implementation uses Zookeeper for coordination./p @@ -174,7 +174,7 @@ ul liOptimized hashtables for shuffle data - reduces memory and CPU consumption/li liEfficient encoding for JobConfs - improves latency for stages reading large numbers of blocks from HDFS, S3, and HBase/li - liShuffle file consolidation (off by default) - reduces the number of files created in large shuffles for better filesystem performance. This change works best on filesystems newer than ext3 (we recommend ext4 or XFS), and it will be the default in Spark 0.9, but weâve left it off by default for compatibility. We recommend users turn this on unless they are using ext3 by setting codespark.shuffle.consolidateFiles/code to âtrueâ./li + liShuffle file consolidation (off by default) - reduces the number of files created in large shuffles for better filesystem performance. This change works best on filesystems newer than ext3 (we recommend ext4 or XFS), and it will be the default in Spark 0.9, but weâve left it off by default for compatibility. We recommend users turn this on unless they are using ext3 by setting codespark.shuffle.consolidateFiles/code to #8220;true#8221;./li liTorrent broadcast (off by default) - a faster broadcast implementation for large objects./li liSupport for fetching large result sets - allows tasks to return large results without tuning Akka buffer sizes./li /ul @@ -211,47 +211,47 @@ h3 id=creditsCredits/h3 ul - liMichael Armbrust â build fix/li - liPierre Borckmans â typo fix in documentation/li - liEvan Chan â codelocal:///code scheme for dependency jars/li - liEwen Cheslack-Postava â codeadd/code method for python accumulators, support for setting config properties in python/li - liMosharaf Chowdhury â optimized broadcast implementation/li - liFrank Dai â documentation fix/li - liAaron Davidson â shuffle file consolidation, H/A mode for standalone scheduler, cleaned up representation of block IDs, several improvements and bug fixes/li - liTathagata Das â new streaming operators, fix for kafka concurrency bug/li - liAnkur Dave â support for pausing spot clusters on EC2/li - liHarvey Feng â optimization to JobConf broadcasts, bug fixes, YARN 2.2 build/li - liAli Ghodsi â YARN 2.2 build/li - liThomas Graves â Spark YARN integration including secure HDFS access over YARN/li - liLi Guoqiang â fix for Maven build/li - liStephen Haberman â bug fix/li - liHaidar Hadi â documentation fix/li - liNathan Howell â bug fix relating to YARN/li - liHolden Karau â Java version of codemapPartitionsWithIndex/code/li - liDu Li â bug fix in make-distrubion.sh/li - liRaymond Liu â work on YARN 2.2 build/li - liXi Liu â bug fix and code clean-up/li - liDavid McCauley â bug fix in standalone mode JSON output/li - liMichael (wannabeast) â bug fix in memory store/li - liFabrizio Milo â typos in
git commit: Optionally include Hive as a dependency of the REPL.
Repository: spark Updated Branches: refs/heads/master 3ce81494c - 7463cd248 Optionally include Hive as a dependency of the REPL. Due to the way spark-shell launches from an assembly jar, I don't think this change will affect anyone who isn't trying to launch the shell directly from sbt. That said, it is kinda nice to be able to launch all things directly from SBT when developing. Author: Michael Armbrust mich...@databricks.com Closes #801 from marmbrus/hiveRepl and squashes the following commits: 9570571 [Michael Armbrust] Optionally include Hive as a dependency of the REPL. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7463cd24 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7463cd24 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7463cd24 Branch: refs/heads/master Commit: 7463cd248f81975bce9ff864002932864bd5b649 Parents: 3ce8149 Author: Michael Armbrust mich...@databricks.com Authored: Sat May 31 12:24:35 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sat May 31 12:24:35 2014 -0700 -- project/SparkBuild.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7463cd24/project/SparkBuild.scala -- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 9833411..64c9441 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -59,8 +59,10 @@ object SparkBuild extends Build { lazy val core = Project(core, file(core), settings = coreSettings) + def replDependencies = Seq[ProjectReference](core, graphx, bagel, mllib, sql) ++ maybeHiveRef + lazy val repl = Project(repl, file(repl), settings = replSettings) -.dependsOn(core, graphx, bagel, mllib, sql) +.dependsOn(replDependencies.map(a = a: sbt.ClasspathDep[sbt.ProjectReference]): _*) lazy val tools = Project(tools, file(tools), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
git commit: [SPARK-1901] worker should make sure executor has exited before updating executor's info
Repository: spark Updated Branches: refs/heads/branch-1.0 80721fb45 - 1696a4470 [SPARK-1901] worker should make sure executor has exited before updating executor's info https://issues.apache.org/jira/browse/SPARK-1901 Author: Zhen Peng zhenpen...@baidu.com Closes #854 from zhpengg/bugfix-worker-kills-executor and squashes the following commits: 21d380b [Zhen Peng] add some error messages 506cea6 [Zhen Peng] add some docs for killProcess() a0b9860 [Zhen Peng] [SPARK-1901] worker should make sure executor has exited before updating executor's info Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1696a447 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1696a447 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1696a447 Branch: refs/heads/branch-1.0 Commit: 1696a44704b8efa6515da9fa311f8acd9dda970e Parents: 80721fb Author: Zhen Peng zhenpen...@baidu.com Authored: Fri May 30 10:11:02 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri May 30 10:11:02 2014 -0700 -- .../spark/deploy/worker/ExecutorRunner.scala| 20 1 file changed, 12 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1696a447/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 2051403..d27e0e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -61,17 +61,23 @@ private[spark] class ExecutorRunner( // Shutdown hook that kills actors on shutdown. shutdownHook = new Thread() { override def run() { -killProcess() +killProcess(Some(Worker shutting down)) } } Runtime.getRuntime.addShutdownHook(shutdownHook) } - private def killProcess() { + /** + * kill executor process, wait for exit and notify worker to update resource status + * + * @param message the exception message which caused the executor's death + */ + private def killProcess(message: Option[String]) { if (process != null) { logInfo(Killing process!) process.destroy() - process.waitFor() + val exitCode = process.waitFor() + worker ! ExecutorStateChanged(appId, execId, state, message, Some(exitCode)) } } @@ -82,7 +88,6 @@ private[spark] class ExecutorRunner( workerThread.interrupt() workerThread = null state = ExecutorState.KILLED - worker ! ExecutorStateChanged(appId, execId, state, None, None) Runtime.getRuntime.removeShutdownHook(shutdownHook) } } @@ -148,14 +153,13 @@ private[spark] class ExecutorRunner( } catch { case interrupted: InterruptedException = { logInfo(Runner thread for executor + fullId + interrupted) -killProcess() +state = ExecutorState.KILLED +killProcess(None) } case e: Exception = { logError(Error running executor, e) -killProcess() state = ExecutorState.FAILED -val message = e.getClass + : + e.getMessage -worker ! ExecutorStateChanged(appId, execId, state, Some(message), None) +killProcess(Some(e.toString)) } } }
git commit: SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers
Repository: spark Updated Branches: refs/heads/branch-1.0 fcb375026 - 214f90ee7 SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers `var cachedPeers: Seq[BlockManagerId] = null` is used in `def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel)` without proper protection. There are two place will call `replicate(blockId, bytesAfterPut, level)` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L644 runs in `connectionManager.futureExecContext` * https://github.com/apache/spark/blob/17f3075bc4aa8cbed165f7b367f70e84b1bc8db9/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L752 `doPut` runs in `connectionManager.handleMessageExecutor`. `org.apache.spark.storage.BlockManagerWorker` calls `blockManager.putBytes` in `connectionManager.handleMessageExecutor`. As they run in different `Executor`s, this is a race condition which may cause the memory pointed by `cachedPeers` is not correct even if `cachedPeers != null`. The race condition of `onReceiveCallback` is that it's set in `BlockManagerWorker` but read in a different thread in `ConnectionManager.handleMessageExecutor`. Author: zsxwing zsxw...@gmail.com Closes #887 from zsxwing/SPARK-1932 and squashes the following commits: 524f69c [zsxwing] SPARK-1932: Fix race conditions in onReceiveCallback and cachedPeers (cherry picked from commit 549830b0db2c8b069391224f3a73bb0d7f397f71) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/214f90ee Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/214f90ee Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/214f90ee Branch: refs/heads/branch-1.0 Commit: 214f90ee7910fada1dc58ecc95be0a83a1356a2d Parents: fcb3750 Author: zsxwing zsxw...@gmail.com Authored: Mon May 26 23:17:39 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Mon May 26 23:17:50 2014 -0700 -- .../main/scala/org/apache/spark/network/ConnectionManager.scala | 3 ++- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/214f90ee/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index dcbbc18..5dd5fd0 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -93,7 +93,8 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf, implicit val futureExecContext = ExecutionContext.fromExecutor( Utils.newDaemonCachedThreadPool(Connection manager future execution context)) - private var onReceiveCallback: (BufferMessage, ConnectionManagerId) = Option[Message]= null + @volatile + private var onReceiveCallback: (BufferMessage, ConnectionManagerId) = Option[Message] = null private val authEnabled = securityManager.isAuthenticationEnabled() http://git-wip-us.apache.org/repos/asf/spark/blob/214f90ee/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6534095..6e45008 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -772,7 +772,7 @@ private[spark] class BlockManager( /** * Replicate block to another node. */ - var cachedPeers: Seq[BlockManagerId] = null + @volatile var cachedPeers: Seq[BlockManagerId] = null private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel) { val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
git commit: bugfix worker DriverStateChanged state should match DriverState.FAILED
Repository: spark Updated Branches: refs/heads/master 549830b0d - 95e4c9c6f bugfix worker DriverStateChanged state should match DriverState.FAILED bugfix worker DriverStateChanged state should match DriverState.FAILED Author: lianhuiwang lianhuiwan...@gmail.com Closes #864 from lianhuiwang/master and squashes the following commits: 480ce94 [lianhuiwang] address aarondav comments f2b5970 [lianhuiwang] bugfix worker DriverStateChanged state should match DriverState.FAILED Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95e4c9c6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95e4c9c6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95e4c9c6 Branch: refs/heads/master Commit: 95e4c9c6fb153b7f0aa4c442c4bdb6552d326640 Parents: 549830b Author: lianhuiwang lianhuiwan...@gmail.com Authored: Tue May 27 11:53:38 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue May 27 11:53:38 2014 -0700 -- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 1 file changed, 4 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/95e4c9c6/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8b67479..100de26 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -317,10 +317,14 @@ private[spark] class Worker( state match { case DriverState.ERROR = logWarning(sDriver $driverId failed with unrecoverable exception: ${exception.get}) +case DriverState.FAILED = + logWarning(sDriver $driverId exited with failure) case DriverState.FINISHED = logInfo(sDriver $driverId exited successfully) case DriverState.KILLED = logInfo(sDriver $driverId was killed by user) +case _ = + logDebug(sDriver $driverId changed state to $state) } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception)
git commit: bugfix worker DriverStateChanged state should match DriverState.FAILED
Repository: spark Updated Branches: refs/heads/branch-1.0 214f90ee7 - 30be37ca7 bugfix worker DriverStateChanged state should match DriverState.FAILED bugfix worker DriverStateChanged state should match DriverState.FAILED Author: lianhuiwang lianhuiwan...@gmail.com Closes #864 from lianhuiwang/master and squashes the following commits: 480ce94 [lianhuiwang] address aarondav comments f2b5970 [lianhuiwang] bugfix worker DriverStateChanged state should match DriverState.FAILED Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/30be37ca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/30be37ca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/30be37ca Branch: refs/heads/branch-1.0 Commit: 30be37ca7e7dec92cf48692e1a707608871b4314 Parents: 214f90e Author: lianhuiwang lianhuiwan...@gmail.com Authored: Tue May 27 11:53:38 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Tue May 27 11:54:53 2014 -0700 -- core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 4 1 file changed, 4 insertions(+) -- http://git-wip-us.apache.org/repos/asf/spark/blob/30be37ca/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8b67479..100de26 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -317,10 +317,14 @@ private[spark] class Worker( state match { case DriverState.ERROR = logWarning(sDriver $driverId failed with unrecoverable exception: ${exception.get}) +case DriverState.FAILED = + logWarning(sDriver $driverId exited with failure) case DriverState.FINISHED = logInfo(sDriver $driverId exited successfully) case DriverState.KILLED = logInfo(sDriver $driverId was killed by user) +case _ = + logDebug(sDriver $driverId changed state to $state) } masterLock.synchronized { master ! DriverStateChanged(driverId, state, exception)
git commit: [SPARK-1688] Propagate PySpark worker stderr to driver
Repository: spark Updated Branches: refs/heads/master d00981a95 - 520087224 [SPARK-1688] Propagate PySpark worker stderr to driver When at least one of the following conditions is true, PySpark cannot be loaded: 1. PYTHONPATH is not set 2. PYTHONPATH does not contain the python directory (or jar, in the case of YARN) 3. The jar does not contain pyspark files (YARN) 4. The jar does not contain py4j files (YARN) However, we currently throw the same random `java.io.EOFException` for all of the above cases, when trying to read from the python daemon's output. This message is super unhelpful. This PR includes the python stderr and the PYTHONPATH in the exception propagated to the driver. Now, the exception message looks something like: ``` Error from python worker: : No module named pyspark PYTHONPATH was: /path/to/spark/python:/path/to/some/jar java.io.EOFException stack trace ``` whereas before it was just ``` java.io.EOFException stack trace ``` Author: Andrew Or andrewo...@gmail.com Closes #603 from andrewor14/pyspark-exception and squashes the following commits: 10d65d3 [Andrew Or] Throwable - Exception, worker - daemon 862d1d7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception a5ed798 [Andrew Or] Use block string and interpolation instead of var (minor) cc09c45 [Andrew Or] Account for the fact that the python daemon may not have terminated yet 444f019 [Andrew Or] Use the new RedirectThread + include system PYTHONPATH aab00ae [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 0cc2402 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 783efe2 [Andrew Or] Make python daemon stderr indentation consistent 9524172 [Andrew Or] Avoid potential NPE / error stream contention + Move things around 29f9688 [Andrew Or] Add back original exception type e92d36b [Andrew Or] Include python worker stderr in the exception propagated to the driver 7c69360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception cdbc185 [Andrew Or] Fix python attribute not found exception when PYTHONPATH is not set dcc0353 [Andrew Or] Check both python and system environment variables for PYTHONPATH 6c09c21 [Andrew Or] Validate PYTHONPATH and PySpark modules before starting python workers Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/52008722 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/52008722 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/52008722 Branch: refs/heads/master Commit: 5200872243aa5906dc8a06772e61d75f19557aac Parents: d00981a Author: Andrew Or andrewo...@gmail.com Authored: Wed May 7 14:35:22 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed May 7 14:35:22 2014 -0700 -- .../apache/spark/api/python/PythonUtils.scala | 27 +++- .../spark/api/python/PythonWorkerFactory.scala | 136 --- .../org/apache/spark/deploy/PythonRunner.scala | 24 +--- .../scala/org/apache/spark/util/Utils.scala | 37 + 4 files changed, 123 insertions(+), 101 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/52008722/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index cf69fa1..6d3e257 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.python -import java.io.File +import java.io.{File, InputStream, IOException, OutputStream} import scala.collection.mutable.ArrayBuffer @@ -40,3 +40,28 @@ private[spark] object PythonUtils { paths.filter(_ != ).mkString(File.pathSeparator) } } + + +/** + * A utility class to redirect the child process's stdout or stderr. + */ +private[spark] class RedirectThread( +in: InputStream, +out: OutputStream, +name: String) + extends Thread(name) { + + setDaemon(true) + override def run() { +scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { +out.write(buf, 0, len) +out.flush() +len = in.read(buf) + } +} + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/52008722/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala -- diff --git
git commit: SPARK-1801. expose InterruptibleIterator and TaskKilledException in deve...
Repository: spark Updated Branches: refs/heads/master 6ce088444 - b22952fa1 SPARK-1801. expose InterruptibleIterator and TaskKilledException in deve... ...loper api Author: Koert Kuipers ko...@tresata.com Closes #764 from koertkuipers/feat-rdd-developerapi and squashes the following commits: 8516dd2 [Koert Kuipers] SPARK-1801. expose InterruptibleIterator and TaskKilledException in developer api Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b22952fa Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b22952fa Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b22952fa Branch: refs/heads/master Commit: b22952fa1f21c0b93208846b5e1941a9d2578c6f Parents: 6ce0884 Author: Koert Kuipers ko...@tresata.com Authored: Wed May 14 00:10:12 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed May 14 00:12:35 2014 -0700 -- .../main/scala/org/apache/spark/InterruptibleIterator.scala | 6 +- .../main/scala/org/apache/spark/TaskKilledException.scala| 8 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b22952fa/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala -- diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index ec11dbb..f40baa8 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -17,11 +17,15 @@ package org.apache.spark +import org.apache.spark.annotation.DeveloperApi + /** + * :: DeveloperApi :: * An iterator that wraps around an existing iterator to provide task killing functionality. * It works by checking the interrupted flag in [[TaskContext]]. */ -private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) +@DeveloperApi +class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { def hasNext: Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/b22952fa/core/src/main/scala/org/apache/spark/TaskKilledException.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskKilledException.scala b/core/src/main/scala/org/apache/spark/TaskKilledException.scala index cbd6b28..ad487c4 100644 --- a/core/src/main/scala/org/apache/spark/TaskKilledException.scala +++ b/core/src/main/scala/org/apache/spark/TaskKilledException.scala @@ -17,7 +17,11 @@ package org.apache.spark +import org.apache.spark.annotation.DeveloperApi + /** - * Exception for a task getting killed. + * :: DeveloperApi :: + * Exception thrown when a task is explicitly killed (i.e., task failure is expected). */ -private[spark] class TaskKilledException extends RuntimeException +@DeveloperApi +class TaskKilledException extends RuntimeException
git commit: SPARK-1801. expose InterruptibleIterator and TaskKilledException in deve...
Repository: spark Updated Branches: refs/heads/branch-1.0 f66f76648 - 7da80a318 SPARK-1801. expose InterruptibleIterator and TaskKilledException in deve... ...loper api Author: Koert Kuipers ko...@tresata.com Closes #764 from koertkuipers/feat-rdd-developerapi and squashes the following commits: 8516dd2 [Koert Kuipers] SPARK-1801. expose InterruptibleIterator and TaskKilledException in developer api (cherry picked from commit b22952fa1f21c0b93208846b5e1941a9d2578c6f) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7da80a31 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7da80a31 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7da80a31 Branch: refs/heads/branch-1.0 Commit: 7da80a3186e9120c26ed88dc1211356a1d5eb8af Parents: f66f766 Author: Koert Kuipers ko...@tresata.com Authored: Wed May 14 00:10:12 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed May 14 00:12:59 2014 -0700 -- .../main/scala/org/apache/spark/InterruptibleIterator.scala | 6 +- .../main/scala/org/apache/spark/TaskKilledException.scala| 8 ++-- 2 files changed, 11 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7da80a31/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala -- diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala index ec11dbb..f40baa8 100644 --- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala +++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala @@ -17,11 +17,15 @@ package org.apache.spark +import org.apache.spark.annotation.DeveloperApi + /** + * :: DeveloperApi :: * An iterator that wraps around an existing iterator to provide task killing functionality. * It works by checking the interrupted flag in [[TaskContext]]. */ -private[spark] class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) +@DeveloperApi +class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T]) extends Iterator[T] { def hasNext: Boolean = { http://git-wip-us.apache.org/repos/asf/spark/blob/7da80a31/core/src/main/scala/org/apache/spark/TaskKilledException.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskKilledException.scala b/core/src/main/scala/org/apache/spark/TaskKilledException.scala index cbd6b28..ad487c4 100644 --- a/core/src/main/scala/org/apache/spark/TaskKilledException.scala +++ b/core/src/main/scala/org/apache/spark/TaskKilledException.scala @@ -17,7 +17,11 @@ package org.apache.spark +import org.apache.spark.annotation.DeveloperApi + /** - * Exception for a task getting killed. + * :: DeveloperApi :: + * Exception thrown when a task is explicitly killed (i.e., task failure is expected). */ -private[spark] class TaskKilledException extends RuntimeException +@DeveloperApi +class TaskKilledException extends RuntimeException
git commit: [SPARK-1769] Executor loss causes NPE race condition
Repository: spark Updated Branches: refs/heads/branch-1.0 b3d987893 - 69ec3149f [SPARK-1769] Executor loss causes NPE race condition This PR replaces the Schedulable data structures in Pool.scala with thread-safe ones from java. Note that Scala's `with SynchronizedBuffer` trait is soon to be deprecated in 2.11 because it is [inherently unreliable](http://www.scala-lang.org/api/2.11.0/index.html#scala.collection.mutable.SynchronizedBuffer). We should slowly drift away from `SynchronizedBuffer` in other places too. Note that this PR introduces an API-breaking change; `sc.getAllPools` now returns an Array rather than an ArrayBuffer. This is because we want this method to return an immutable copy rather than one may potentially confuse the user if they try to modify the copy, which takes no effect on the original data structure. Author: Andrew Or andrewo...@gmail.com Closes #762 from andrewor14/pool-npe and squashes the following commits: 383e739 [Andrew Or] JavaConverters - JavaConversions 3f32981 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pool-npe 769be19 [Andrew Or] Assorted minor changes 2189247 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pool-npe 05ad9e9 [Andrew Or] Fix test - contains is not the same as containsKey 0921ea0 [Andrew Or] var - val 07d720c [Andrew Or] Synchronize Schedulable data structures (cherry picked from commit 69f750228f3ec8537a93da08e712596fa8004143) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69ec3149 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69ec3149 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69ec3149 Branch: refs/heads/branch-1.0 Commit: 69ec3149fb4d732935748b9afee4f9d8a7b1244e Parents: b3d9878 Author: Andrew Or andrewo...@gmail.com Authored: Wed May 14 00:54:33 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed May 14 00:54:49 2014 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 20 - .../scala/org/apache/spark/scheduler/Pool.scala | 31 ++-- .../apache/spark/scheduler/Schedulable.scala| 6 ++-- .../spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../scheduler/TaskSchedulerImplSuite.scala | 2 +- 5 files changed, 35 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/69ec3149/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index c43b4fd..032b3d7 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -17,15 +17,17 @@ package org.apache.spark +import scala.language.implicitConversions + import java.io._ import java.net.URI import java.util.concurrent.atomic.AtomicInteger import java.util.{Properties, UUID} import java.util.UUID.randomUUID import scala.collection.{Map, Set} +import scala.collection.JavaConversions._ import scala.collection.generic.Growable -import scala.collection.mutable.{ArrayBuffer, HashMap} -import scala.language.implicitConversions +import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -836,18 +838,22 @@ class SparkContext(config: SparkConf) extends Logging { } /** - * Return pools for fair scheduler - * TODO(xiajunluan): We should take nested pools into account + * :: DeveloperApi :: + * Return pools for fair scheduler */ - def getAllPools: ArrayBuffer[Schedulable] = { -taskScheduler.rootPool.schedulableQueue + @DeveloperApi + def getAllPools: Seq[Schedulable] = { +// TODO(xiajunluan): We should take nested pools into account +taskScheduler.rootPool.schedulableQueue.toSeq } /** + * :: DeveloperApi :: * Return the pool associated with the given name, if one exists */ + @DeveloperApi def getPoolForName(pool: String): Option[Schedulable] = { -taskScheduler.rootPool.schedulableNameToSchedulable.get(pool) +Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69ec3149/core/src/main/scala/org/apache/spark/scheduler/Pool.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 187672c..174b732 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++
git commit: SPARK-1770: Revert accidental(?) fix
Repository: spark Updated Branches: refs/heads/branch-1.0 80f292a21 - 8202276c9 SPARK-1770: Revert accidental(?) fix Looks like this change was accidentally committed here: https://github.com/apache/spark/commit/06b15baab25951d124bbe6b64906f4139e037deb but the change does not show up in the PR itself (#704). Other than not intending to go in with that PR, this also broke the test JavaAPISuite.repartition. Author: Aaron Davidson aa...@databricks.com Closes #716 from aarondav/shufflerand and squashes the following commits: b1cf70b [Aaron Davidson] SPARK-1770: Revert accidental(?) fix (cherry picked from commit 59577df14c06417676a9ffdd599f5713c448e299) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8202276c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8202276c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8202276c Branch: refs/heads/branch-1.0 Commit: 8202276c916879eeb64e2b5591aa0faf5b0172bd Parents: 80f292a Author: Aaron Davidson aa...@databricks.com Authored: Fri May 9 14:51:34 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri May 9 14:52:13 2014 -0700 -- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8202276c/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 9d8d804..a1ca612 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -330,9 +330,9 @@ abstract class RDD[T: ClassTag]( if (shuffle) { // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( -new ShuffledRDD[Int, T, (Int, T)](map(x = (Utils.random.nextInt(), x)), +new ShuffledRDD[T, Null, (T, Null)](map(x = (x, null)), new HashPartitioner(numPartitions)), -numPartitions).values +numPartitions).keys } else { new CoalescedRDD(this, numPartitions) }
git commit: [SPARK-1745] Move interrupted flag from TaskContext constructor (minor)
Repository: spark Updated Branches: refs/heads/master 44dd57fb6 - c3f8b78c2 [SPARK-1745] Move interrupted flag from TaskContext constructor (minor) It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or andrewo...@gmail.com Closes #675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3f8b78c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3f8b78c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3f8b78c Branch: refs/heads/master Commit: c3f8b78c211df6c5adae74f37e39fb55baeff723 Parents: 44dd57f Author: Andrew Or andrewo...@gmail.com Authored: Thu May 8 12:13:07 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Thu May 8 12:13:07 2014 -0700 -- .../scala/org/apache/spark/TaskContext.scala| 20 +++- .../apache/spark/scheduler/ShuffleMapTask.scala | 3 +-- .../java/org/apache/spark/JavaAPISuite.java | 2 +- .../org/apache/spark/CacheManagerSuite.scala| 10 +++--- .../scala/org/apache/spark/PipedRDDSuite.scala | 4 +--- 5 files changed, 17 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/c3f8b78c/core/src/main/scala/org/apache/spark/TaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index fc48127..51f40c3 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -28,13 +28,12 @@ import org.apache.spark.executor.TaskMetrics */ @DeveloperApi class TaskContext( - val stageId: Int, - val partitionId: Int, - val attemptId: Long, - val runningLocally: Boolean = false, - @volatile var interrupted: Boolean = false, - private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty -) extends Serializable { +val stageId: Int, +val partitionId: Int, +val attemptId: Long, +val runningLocally: Boolean = false, +private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty) + extends Serializable { @deprecated(use partitionId, 0.8.1) def splitId = partitionId @@ -42,7 +41,10 @@ class TaskContext( // List of callback functions to execute when the task completes. @transient private val onCompleteCallbacks = new ArrayBuffer[() = Unit] - // Set to true when the task is completed, before the onCompleteCallbacks are executed. + // Whether the corresponding task has been killed. + @volatile var interrupted: Boolean = false + + // Whether the task has completed, before the onCompleteCallbacks are executed. @volatile var completed: Boolean = false /** @@ -58,6 +60,6 @@ class TaskContext( def executeOnCompleteCallbacks() { completed = true // Process complete callbacks in the reverse order of registration -onCompleteCallbacks.reverse.foreach{_()} +onCompleteCallbacks.reverse.foreach { _() } } } http://git-wip-us.apache.org/repos/asf/spark/blob/c3f8b78c/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 2259df0..4b0324f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -23,7 +23,6 @@ import java.io._ import java.util.zip.{GZIPInputStream, GZIPOutputStream} import scala.collection.mutable.HashMap -import scala.util.Try import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics @@ -70,7 +69,7 @@ private[spark] object ShuffleMapTask { } // Since both the JarSet and FileSet have the same format this is used for both. - def deserializeFileSet(bytes: Array[Byte]) : HashMap[String, Long] = { + def deserializeFileSet(bytes: Array[Byte]): HashMap[String, Long] = { val in = new GZIPInputStream(new ByteArrayInputStream(bytes)) val objIn = new
git commit: [SPARK-1688] Propagate PySpark worker stderr to driver
Repository: spark Updated Branches: refs/heads/branch-1.0 0759ee790 - 82c8e89c9 [SPARK-1688] Propagate PySpark worker stderr to driver When at least one of the following conditions is true, PySpark cannot be loaded: 1. PYTHONPATH is not set 2. PYTHONPATH does not contain the python directory (or jar, in the case of YARN) 3. The jar does not contain pyspark files (YARN) 4. The jar does not contain py4j files (YARN) However, we currently throw the same random `java.io.EOFException` for all of the above cases, when trying to read from the python daemon's output. This message is super unhelpful. This PR includes the python stderr and the PYTHONPATH in the exception propagated to the driver. Now, the exception message looks something like: ``` Error from python worker: : No module named pyspark PYTHONPATH was: /path/to/spark/python:/path/to/some/jar java.io.EOFException stack trace ``` whereas before it was just ``` java.io.EOFException stack trace ``` Author: Andrew Or andrewo...@gmail.com Closes #603 from andrewor14/pyspark-exception and squashes the following commits: 10d65d3 [Andrew Or] Throwable - Exception, worker - daemon 862d1d7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception a5ed798 [Andrew Or] Use block string and interpolation instead of var (minor) cc09c45 [Andrew Or] Account for the fact that the python daemon may not have terminated yet 444f019 [Andrew Or] Use the new RedirectThread + include system PYTHONPATH aab00ae [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 0cc2402 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception 783efe2 [Andrew Or] Make python daemon stderr indentation consistent 9524172 [Andrew Or] Avoid potential NPE / error stream contention + Move things around 29f9688 [Andrew Or] Add back original exception type e92d36b [Andrew Or] Include python worker stderr in the exception propagated to the driver 7c69360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into pyspark-exception cdbc185 [Andrew Or] Fix python attribute not found exception when PYTHONPATH is not set dcc0353 [Andrew Or] Check both python and system environment variables for PYTHONPATH 6c09c21 [Andrew Or] Validate PYTHONPATH and PySpark modules before starting python workers (cherry picked from commit 5200872243aa5906dc8a06772e61d75f19557aac) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82c8e89c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82c8e89c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82c8e89c Branch: refs/heads/branch-1.0 Commit: 82c8e89c9581c45c7878b8f406cf3d90d4b0d74c Parents: 0759ee7 Author: Andrew Or andrewo...@gmail.com Authored: Wed May 7 14:35:22 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed May 7 14:35:37 2014 -0700 -- .../apache/spark/api/python/PythonUtils.scala | 27 +++- .../spark/api/python/PythonWorkerFactory.scala | 136 --- .../org/apache/spark/deploy/PythonRunner.scala | 24 +--- .../scala/org/apache/spark/util/Utils.scala | 37 + 4 files changed, 123 insertions(+), 101 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/82c8e89c/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index cf69fa1..6d3e257 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.python -import java.io.File +import java.io.{File, InputStream, IOException, OutputStream} import scala.collection.mutable.ArrayBuffer @@ -40,3 +40,28 @@ private[spark] object PythonUtils { paths.filter(_ != ).mkString(File.pathSeparator) } } + + +/** + * A utility class to redirect the child process's stdout or stderr. + */ +private[spark] class RedirectThread( +in: InputStream, +out: OutputStream, +name: String) + extends Thread(name) { + + setDaemon(true) + override def run() { +scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { +out.write(buf, 0, len) +out.flush() +len = in.read(buf) + } +} + } +}
git commit: SPARK-1686: keep schedule() calling in the main thread
Repository: spark Updated Branches: refs/heads/branch-1.0 8202276c9 - adf8cdd0b SPARK-1686: keep schedule() calling in the main thread https://issues.apache.org/jira/browse/SPARK-1686 moved from original JIRA (by @markhamstra): In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties. There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread. In this PR, I added a new master message TriggerSchedule to trigger the local call of schedule() in the scheduler thread Author: CodingCat zhunans...@gmail.com Closes #639 from CodingCat/SPARK-1686 and squashes the following commits: 81bb4ca [CodingCat] rename variable 69e0a2a [CodingCat] style fix 36a2ac0 [CodingCat] address Aaron's comments ec9b7bb [CodingCat] address the comments 02b37ca [CodingCat] keep schedule() calling in the main thread (cherry picked from commit 2f452cbaf35dbc609ab48ec0ee5e3dd7b6b9b790) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adf8cdd0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adf8cdd0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adf8cdd0 Branch: refs/heads/branch-1.0 Commit: adf8cdd0b29731325f08552d050c43fe1bbd724f Parents: 8202276 Author: CodingCat zhunans...@gmail.com Authored: Fri May 9 21:50:23 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Fri May 9 21:52:40 2014 -0700 -- .../org/apache/spark/deploy/master/Master.scala | 15 --- 1 file changed, 12 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/adf8cdd0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index fdb633b..f254f55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -104,6 +104,8 @@ private[spark] class Master( var leaderElectionAgent: ActorRef = _ + private var recoveryCompletionTask: Cancellable = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. @@ -152,6 +154,10 @@ private[spark] class Master( } override def postStop() { +// prevent the CompleteRecovery message sending to restarted master +if (recoveryCompletionTask != null) { + recoveryCompletionTask.cancel() +} webUi.stop() fileSystemsUsed.foreach(_.close()) masterMetricsSystem.stop() @@ -171,10 +177,13 @@ private[spark] class Master( logInfo(I have been elected leader! New state: + state) if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedDrivers, storedWorkers) -context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } +recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, + CompleteRecovery) } } +case CompleteRecovery = completeRecovery() + case RevokedLeadership = { logError(Leadership has been revoked -- master shutting down.) System.exit(0) @@ -465,7 +474,7 @@ private[spark] class Master( * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ - def schedule() { + private def schedule() { if (state != RecoveryState.ALIVE) { return } // First schedule drivers, they take strict precedence over applications @@ -485,7 +494,7 @@ private[spark] class Master( // Try to spread out each app among all the nodes, until it has all its cores for (app - waitingApps if app.coresLeft 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app,
git commit: SPARK-1700: Close socket file descriptors on task completion
Repository: spark Updated Branches: refs/heads/branch-1.0 a314342da - d2cbd3d76 SPARK-1700: Close socket file descriptors on task completion This will ensure that sockets do not build up over the course of a job, and that cancellation successfully cleans up sockets. Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather than the expected 8ish) but they do not pile up between runs, or as high as before (where they went up to around 5k). Author: Aaron Davidson aa...@databricks.com Closes #623 from aarondav/pyspark2 and squashes the following commits: 0ca13bb [Aaron Davidson] SPARK-1700: Close socket file descriptors on task completion (cherry picked from commit 0a14421765b672305e8f32ded4a9a1f6f7241d8d) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2cbd3d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2cbd3d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2cbd3d7 Branch: refs/heads/branch-1.0 Commit: d2cbd3d766ac96de75f9b519696a83a9b810e21c Parents: a314342 Author: Aaron Davidson aa...@databricks.com Authored: Fri May 2 23:55:13 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sat May 3 00:12:09 2014 -0700 -- .../scala/org/apache/spark/api/python/PythonRDD.scala| 11 ++- 1 file changed, 10 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/d2cbd3d7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 672c344..6140700 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis val env = SparkEnv.get -val worker = env.createPythonWorker(pythonExec, envVars.toMap) +val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) + +// Ensure worker socket is closed on task completion. Closing sockets is idempotent. +context.addOnCompleteCallback(() = + try { +worker.close() + } catch { +case e: Exception = logWarning(Failed to close worker socket, e) + } +) @volatile var readerException: Exception = null
git commit: [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak
Repository: spark Updated Branches: refs/heads/branch-1.0 34f22bcc4 - 0441515f2 [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems. Fix spark on yarn to work when the cluster is running as user yarn but the clients are launched as the user and want to read/write to hdfs as the user. Note this hasn't been fully tested yet. Need to test in standalone mode. Putting this up for people to look at and possibly test. I don't have access to a mesos cluster. This is alternative to https://github.com/apache/spark/pull/607 Author: Thomas Graves tgra...@apache.org Closes #621 from tgravescs/SPARK-1676 and squashes the following commits: 244d55a [Thomas Graves] fix line length 44163d4 [Thomas Graves] Rework 9398853 [Thomas Graves] change to have doAs in executor higher up. (cherry picked from commit 3d0a02dff3011e8894d98d903cd086bc95e56807) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0441515f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0441515f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0441515f Branch: refs/heads/branch-1.0 Commit: 0441515f221146756800dc583b225bdec8a6c075 Parents: 34f22bc Author: Thomas Graves tgra...@apache.org Authored: Sat May 3 10:59:05 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sat May 3 11:01:54 2014 -0700 -- .../apache/spark/deploy/SparkHadoopUtil.scala | 17 ++-- .../executor/CoarseGrainedExecutorBackend.scala | 44 +++- .../org/apache/spark/executor/Executor.scala| 4 +- .../spark/executor/MesosExecutorBackend.scala | 14 --- .../spark/deploy/yarn/ApplicationMaster.scala | 10 ++--- .../spark/deploy/yarn/ExecutorLauncher.scala| 7 +++- .../spark/deploy/yarn/ApplicationMaster.scala | 12 +++--- .../spark/deploy/yarn/ExecutorLauncher.scala| 7 +++- 8 files changed, 69 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/0441515f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 498fcc5..e2df1b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import scala.collection.JavaConversions._ /** * Contains util methods to interact with Hadoop from Spark. */ -class SparkHadoopUtil { +class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) - def runAsUser(user: String)(func: () = Unit) { + /** + * Runs the given function with a Hadoop UserGroupInformation as a thread local variable + * (distributed to child threads), used for authenticating HDFS and YARN calls. + * + * IMPORTANT NOTE: If this function is going to be called repeated in the same process + * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly + * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems + */ + def runAsSparkUser(func: () = Unit) { +val user = Option(System.getenv(SPARK_USER)).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { + logDebug(running as user: + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } else { + logDebug(running as SPARK_UNKNOWN_USER) func() } } http://git-wip-us.apache.org/repos/asf/spark/blob/0441515f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9ac7365..e912ae8 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,8 +22,9 @@ import
git commit: [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak
Repository: spark Updated Branches: refs/heads/master 9347565f4 - 3d0a02dff [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems. Fix spark on yarn to work when the cluster is running as user yarn but the clients are launched as the user and want to read/write to hdfs as the user. Note this hasn't been fully tested yet. Need to test in standalone mode. Putting this up for people to look at and possibly test. I don't have access to a mesos cluster. This is alternative to https://github.com/apache/spark/pull/607 Author: Thomas Graves tgra...@apache.org Closes #621 from tgravescs/SPARK-1676 and squashes the following commits: 244d55a [Thomas Graves] fix line length 44163d4 [Thomas Graves] Rework 9398853 [Thomas Graves] change to have doAs in executor higher up. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d0a02df Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d0a02df Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d0a02df Branch: refs/heads/master Commit: 3d0a02dff3011e8894d98d903cd086bc95e56807 Parents: 9347565 Author: Thomas Graves tgra...@apache.org Authored: Sat May 3 10:59:05 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sat May 3 10:59:05 2014 -0700 -- .../apache/spark/deploy/SparkHadoopUtil.scala | 17 ++-- .../executor/CoarseGrainedExecutorBackend.scala | 44 +++- .../org/apache/spark/executor/Executor.scala| 4 +- .../spark/executor/MesosExecutorBackend.scala | 14 --- .../spark/deploy/yarn/ApplicationMaster.scala | 10 ++--- .../spark/deploy/yarn/ExecutorLauncher.scala| 7 +++- .../spark/deploy/yarn/ApplicationMaster.scala | 12 +++--- .../spark/deploy/yarn/ExecutorLauncher.scala| 7 +++- 8 files changed, 69 insertions(+), 46 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/3d0a02df/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 498fcc5..e2df1b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,25 +24,36 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import scala.collection.JavaConversions._ /** * Contains util methods to interact with Hadoop from Spark. */ -class SparkHadoopUtil { +class SparkHadoopUtil extends Logging { val conf: Configuration = newConfiguration() UserGroupInformation.setConfiguration(conf) - def runAsUser(user: String)(func: () = Unit) { + /** + * Runs the given function with a Hadoop UserGroupInformation as a thread local variable + * (distributed to child threads), used for authenticating HDFS and YARN calls. + * + * IMPORTANT NOTE: If this function is going to be called repeated in the same process + * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly + * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems + */ + def runAsSparkUser(func: () = Unit) { +val user = Option(System.getenv(SPARK_USER)).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { + logDebug(running as user: + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } else { + logDebug(running as SPARK_UNKNOWN_USER) func() } } http://git-wip-us.apache.org/repos/asf/spark/blob/3d0a02df/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9ac7365..e912ae8 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -22,8 +22,9 @@ import java.nio.ByteBuffer import akka.actor._ import akka.remote._ -import org.apache.spark.{SecurityManager, SparkConf, Logging}
git commit: [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak
Repository: spark Updated Branches: refs/heads/branch-0.9 54c3b7e3b - 45561cd9f [WIP] SPARK-1676: Cache Hadoop UGIs by default to prevent FileSystem leak Move the doAs in Executor higher up so that we only have 1 ugi and aren't leaking filesystems. Fix spark on yarn to work when the cluster is running as user yarn but the clients are launched as the user and want to read/write to hdfs as the user. Note this hasn't been fully tested yet. Need to test in standalone mode. Putting this up for people to look at and possibly test. I don't have access to a mesos cluster. This is alternative to https://github.com/apache/spark/pull/607 Author: Thomas Graves tgra...@apache.org Closes #621 from tgravescs/SPARK-1676 and squashes the following commits: 244d55a [Thomas Graves] fix line length 44163d4 [Thomas Graves] Rework 9398853 [Thomas Graves] change to have doAs in executor higher up. (cherry picked from commit 3d0a02dff3011e8894d98d903cd086bc95e56807) Signed-off-by: Aaron Davidson aa...@databricks.com Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/WorkerLauncher.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45561cd9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45561cd9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45561cd9 Branch: refs/heads/branch-0.9 Commit: 45561cd9fcf0445274d82da495401d32b62aa915 Parents: 54c3b7e Author: Thomas Graves tgra...@apache.org Authored: Sat May 3 10:59:05 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sat May 3 11:13:06 2014 -0700 -- .../apache/spark/deploy/SparkHadoopUtil.scala | 17 +++-- .../executor/CoarseGrainedExecutorBackend.scala | 36 .../org/apache/spark/executor/Executor.scala| 4 +-- .../spark/executor/MesosExecutorBackend.scala | 14 .../spark/deploy/yarn/ApplicationMaster.scala | 10 +++--- .../spark/deploy/yarn/WorkerLauncher.scala | 6 +++- .../spark/deploy/yarn/ApplicationMaster.scala | 12 +++ .../spark/deploy/yarn/WorkerLauncher.scala | 5 ++- 8 files changed, 64 insertions(+), 40 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala -- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index ec15647..f31dd4e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -23,25 +23,36 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{Logging, SparkContext, SparkException} import scala.collection.JavaConversions._ /** * Contains util methods to interact with Hadoop from Spark. */ -class SparkHadoopUtil { +class SparkHadoopUtil extends Logging { val conf = newConfiguration() UserGroupInformation.setConfiguration(conf) - def runAsUser(user: String)(func: () = Unit) { + /** + * Runs the given function with a Hadoop UserGroupInformation as a thread local variable + * (distributed to child threads), used for authenticating HDFS and YARN calls. + * + * IMPORTANT NOTE: If this function is going to be called repeated in the same process + * you need to look https://issues.apache.org/jira/browse/HDFS-3545 and possibly + * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems + */ + def runAsSparkUser(func: () = Unit) { +val user = Option(System.getenv(SPARK_USER)).getOrElse(SparkContext.SPARK_UNKNOWN_USER) if (user != SparkContext.SPARK_UNKNOWN_USER) { + logDebug(running as user: + user) val ugi = UserGroupInformation.createRemoteUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } else { + logDebug(running as SPARK_UNKNOWN_USER) func() } } http://git-wip-us.apache.org/repos/asf/spark/blob/45561cd9/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
git commit: SPARK-1587 Fix thread leak
Repository: spark Updated Branches: refs/heads/master bb68f4774 - dd681f502 SPARK-1587 Fix thread leak mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan mridul...@apache.org Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd681f50 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd681f50 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd681f50 Branch: refs/heads/master Commit: dd681f502eafe39cfb8a5a62ea2d28016ac6013d Parents: bb68f47 Author: Mridul Muralidharan mridul...@apache.org Authored: Wed Apr 23 23:20:55 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Apr 23 23:20:55 2014 -0700 -- .../apache/spark/metrics/MetricsSystem.scala| 22 --- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../org/apache/spark/storage/BlockManager.scala | 2 ++ .../apache/spark/storage/DiskBlockManager.scala | 28 .../spark/storage/ShuffleBlockManager.scala | 4 +++ .../scala/org/apache/spark/ui/JettyUtils.scala | 1 + .../spark/storage/DiskBlockManagerSuite.scala | 5 7 files changed, 42 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index c5bda20..651511d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String, sinkConfigs.foreach { kv = val classPath = kv._2.getProperty(class) - try { -val sink = Class.forName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) -if (kv._1 == servlet) { - metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) -} else { - sinks += sink.asInstanceOf[Sink] + if (null != classPath) { +try { + val sink = Class.forName(classPath) +.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) +.newInstance(kv._2, registry, securityMgr) + if (kv._1 == servlet) { +metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { +sinks += sink.asInstanceOf[Sink] + } +} catch { + case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } - } catch { -case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index be19d9b..5a68f38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +starvationTimer.cancel() // sleeping for an arbitrary 1 seconds to ensure that messages are sent out. Thread.sleep(1000L) http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f15fa4d..ccd5c53 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1021,6 +1021,8 @@ private[spark] class BlockManager( heartBeatTask.cancel() } connectionManager.stop() +shuffleBlockManager.stop() +diskBlockManager.stop()
git commit: SPARK-1587 Fix thread leak
Repository: spark Updated Branches: refs/heads/branch-1.0 e8907718a - 8684a15e5 SPARK-1587 Fix thread leak mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same vm. Author: Mridul Muralidharan mridul...@apache.org Closes #504 from mridulm/resource_leak_fixes and squashes the following commits: a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads when SparkContext.stop is invoked. Causes tests to fail 7b5e19c [Mridul Muralidharan] Prevent NPE while running tests (cherry picked from commit dd681f502eafe39cfb8a5a62ea2d28016ac6013d) Signed-off-by: Aaron Davidson aa...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8684a15e Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8684a15e Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8684a15e Branch: refs/heads/branch-1.0 Commit: 8684a15e50978480831d5f5e52684a61fe4ee7a6 Parents: e890771 Author: Mridul Muralidharan mridul...@apache.org Authored: Wed Apr 23 23:20:55 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Apr 23 23:21:16 2014 -0700 -- .../apache/spark/metrics/MetricsSystem.scala| 22 --- .../spark/scheduler/TaskSchedulerImpl.scala | 1 + .../org/apache/spark/storage/BlockManager.scala | 2 ++ .../apache/spark/storage/DiskBlockManager.scala | 28 .../spark/storage/ShuffleBlockManager.scala | 4 +++ .../scala/org/apache/spark/ui/JettyUtils.scala | 1 + .../spark/storage/DiskBlockManagerSuite.scala | 5 7 files changed, 42 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala -- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala index c5bda20..651511d 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala @@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String, sinkConfigs.foreach { kv = val classPath = kv._2.getProperty(class) - try { -val sink = Class.forName(classPath) - .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) - .newInstance(kv._2, registry, securityMgr) -if (kv._1 == servlet) { - metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) -} else { - sinks += sink.asInstanceOf[Sink] + if (null != classPath) { +try { + val sink = Class.forName(classPath) +.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager]) +.newInstance(kv._2, registry, securityMgr) + if (kv._1 == servlet) { +metricsServlet = Some(sink.asInstanceOf[MetricsServlet]) + } else { +sinks += sink.asInstanceOf[Sink] + } +} catch { + case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } - } catch { -case e: Exception = logError(Sink class + classPath + cannot be instantialized, e) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index be19d9b..5a68f38 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +starvationTimer.cancel() // sleeping for an arbitrary 1 seconds to ensure that messages are sent out. Thread.sleep(1000L) http://git-wip-us.apache.org/repos/asf/spark/blob/8684a15e/core/src/main/scala/org/apache/spark/storage/BlockManager.scala -- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index f15fa4d..ccd5c53 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -1021,6 +1021,8 @@ private[spark] class BlockManager(
git commit: [SPARK-1385] Use existing code for JSON de/serialization of BlockId
Repository: spark Updated Branches: refs/heads/master 11973a7bd - de8eefa80 [SPARK-1385] Use existing code for JSON de/serialization of BlockId `BlockId.scala` offers a way to reconstruct a BlockId from a string through regex matching. `util/JsonProtocol.scala` duplicates this functionality by explicitly matching on the BlockId type. With this PR, the de/serialization of BlockIds will go through the first (older) code path. (Most of the line changes in this PR involve changing `==` to `===` in `JsonProtocolSuite.scala`) Author: Andrew Or andrewo...@gmail.com Closes #289 from andrewor14/blockid-json and squashes the following commits: 409d226 [Andrew Or] Simplify JSON de/serialization for BlockId Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de8eefa8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de8eefa8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de8eefa8 Branch: refs/heads/master Commit: de8eefa804e229635eaa29a78b9e9ce161ac58e1 Parents: 11973a7 Author: Andrew Or andrewo...@gmail.com Authored: Wed Apr 2 10:43:09 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Apr 2 10:43:09 2014 -0700 -- .../org/apache/spark/util/JsonProtocol.scala| 77 +- .../apache/spark/util/JsonProtocolSuite.scala | 141 +-- 2 files changed, 72 insertions(+), 146 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/de8eefa8/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala -- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 346f2b7..d9a6af6 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -195,7 +195,7 @@ private[spark] object JsonProtocol { taskMetrics.shuffleWriteMetrics.map(shuffleWriteMetricsToJson).getOrElse(JNothing) val updatedBlocks = taskMetrics.updatedBlocks.map { blocks = JArray(blocks.toList.map { case (id, status) = - (Block ID - blockIdToJson(id)) ~ + (Block ID - id.toString) ~ (Status - blockStatusToJson(status)) }) }.getOrElse(JNothing) @@ -284,35 +284,6 @@ private[spark] object JsonProtocol { (Replication - storageLevel.replication) } - def blockIdToJson(blockId: BlockId): JValue = { -val blockType = Utils.getFormattedClassName(blockId) -val json: JObject = blockId match { - case rddBlockId: RDDBlockId = -(RDD ID - rddBlockId.rddId) ~ -(Split Index - rddBlockId.splitIndex) - case shuffleBlockId: ShuffleBlockId = -(Shuffle ID - shuffleBlockId.shuffleId) ~ -(Map ID - shuffleBlockId.mapId) ~ -(Reduce ID - shuffleBlockId.reduceId) - case broadcastBlockId: BroadcastBlockId = -Broadcast ID - broadcastBlockId.broadcastId - case broadcastHelperBlockId: BroadcastHelperBlockId = -(Broadcast Block ID - blockIdToJson(broadcastHelperBlockId.broadcastId)) ~ -(Helper Type - broadcastHelperBlockId.hType) - case taskResultBlockId: TaskResultBlockId = -Task ID - taskResultBlockId.taskId - case streamBlockId: StreamBlockId = -(Stream ID - streamBlockId.streamId) ~ -(Unique ID - streamBlockId.uniqueId) - case tempBlockId: TempBlockId = -val uuid = UUIDToJson(tempBlockId.id) -Temp ID - uuid - case testBlockId: TestBlockId = -Test ID - testBlockId.id -} -(Type - blockType) ~ json - } - def blockStatusToJson(blockStatus: BlockStatus): JValue = { val storageLevel = storageLevelToJson(blockStatus.storageLevel) (Storage Level - storageLevel) ~ @@ -513,7 +484,7 @@ private[spark] object JsonProtocol { Utils.jsonOption(json \ Shuffle Write Metrics).map(shuffleWriteMetricsFromJson) metrics.updatedBlocks = Utils.jsonOption(json \ Updated Blocks).map { value = value.extract[List[JValue]].map { block = -val id = blockIdFromJson(block \ Block ID) +val id = BlockId((block \ Block ID).extract[String]) val status = blockStatusFromJson(block \ Status) (id, status) } @@ -616,50 +587,6 @@ private[spark] object JsonProtocol { StorageLevel(useDisk, useMemory, deserialized, replication) } - def blockIdFromJson(json: JValue): BlockId = { -val rddBlockId = Utils.getFormattedClassName(RDDBlockId) -val shuffleBlockId = Utils.getFormattedClassName(ShuffleBlockId) -val broadcastBlockId = Utils.getFormattedClassName(BroadcastBlockId) -val broadcastHelperBlockId = Utils.getFormattedClassName(BroadcastHelperBlockId) -val taskResultBlockId =
git commit: SPARK-1099:Spark's local mode should probably respect spark.cores.max by default
Repository: spark Updated Branches: refs/heads/master 67fa71cba - 16789317a SPARK-1099:Spark's local mode should probably respect spark.cores.max by default This is for JIRA:https://spark-project.atlassian.net/browse/SPARK-1099 And this is what I do in this patch (also commented in the JIRA) @aarondav This is really a behavioral change, so I do this with great caution, and welcome any review advice: 1 I change the MASTER=local pattern of create LocalBackEnd . In the past, we passed 1 core to it . now it use a default cores The reason here is that when someone use spark-shell to start local mode , Repl will use this MASTER=local pattern as default. So if one also specify cores in the spark-shell command line, it will all go in here. So here pass 1 core is not suitalbe reponding to our change here. 2 In the LocalBackEnd , the totalCores variable are fetched following a different rule(in the past it just take in a userd passed cores, like 1 in MASTER=local pattern, 2 in MASTER=local[2] pattern rules: a The second argument of LocalBackEnd 's constructor indicating cores have a default value which is Int.MaxValue. If user didn't pass it , its first default value is Int.MaxValue b In getMaxCores, we first compare the former value to Int.MaxValue. if it's not equal, we think that user has passed their desired value, so just use it c. If b is not satified, we then get cores from spark.cores.max, and we get real logical cores from Runtime. And if cores specified by spark.cores.max is bigger than logical cores, we use logical cores, otherwise we use spark.cores.max 3 In SparkContextSchedulerCreationSuite 's test(local) case, assertion is modified from 1 to logical cores, because MASTER=local pattern use default vaules. Author: qqsun8819 jin@alibaba-inc.com Closes #110 from qqsun8819/local-cores and squashes the following commits: 731aefa [qqsun8819] 1 LocalBackend not change 2 In SparkContext do some process to the cores and pass it to original LocalBackend constructor 78b9c60 [qqsun8819] 1 SparkContext MASTER=local pattern use default cores instead of 1 to construct LocalBackEnd , for use of spark-shell and cores specified in cmd line 2 some test case change from local to local[1]. 3 SparkContextSchedulerCreationSuite test spark.cores.max config in local pattern 6ae1ee8 [qqsun8819] Add a static function in LocalBackEnd to let it use spark.cores.max specified cores when no cores are passed to it Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16789317 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16789317 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16789317 Branch: refs/heads/master Commit: 16789317a34c1974f7b35960f06a7b51d8e0f29f Parents: 67fa71c Author: qqsun8819 jin@alibaba-inc.com Authored: Wed Mar 19 16:33:54 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Mar 19 16:33:54 2014 -0700 -- .../scala/org/apache/spark/SparkContext.scala| 5 - .../test/scala/org/apache/spark/FileSuite.scala | 4 ++-- .../SparkContextSchedulerCreationSuite.scala | 19 --- 3 files changed, 22 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/16789317/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index a1003b7..8f74607 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1262,7 +1262,10 @@ object SparkContext extends Logging { master match { case local = val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) -val backend = new LocalBackend(scheduler, 1) +// Use user specified in config, up to all available cores +val realCores = Runtime.getRuntime.availableProcessors() +val toUseCores = math.min(sc.conf.getInt(spark.cores.max, realCores), realCores) +val backend = new LocalBackend(scheduler, toUseCores) scheduler.initialize(backend) scheduler http://git-wip-us.apache.org/repos/asf/spark/blob/16789317/core/src/test/scala/org/apache/spark/FileSuite.scala -- diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 01af940..b4a5881 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.SparkContext._ class FileSuite extends FunSuite with
git commit: SPARK-1160: Deprecate toArray in RDD
Repository: spark Updated Branches: refs/heads/master b8afe3052 - 9032f7c0d SPARK-1160: Deprecate toArray in RDD https://spark-project.atlassian.net/browse/SPARK-1160 reported by @mateiz: It's redundant with collect() and the name doesn't make sense in Java, where we return a List (we can't return an array due to the way Java generics work). It's also missing in Python. In this patch, I deprecated the method and changed the source files using it by replacing toArray with collect() directly Author: CodingCat zhunans...@gmail.com Closes #105 from CodingCat/SPARK-1060 and squashes the following commits: 286f163 [CodingCat] deprecate in JavaRDDLike ee17b4e [CodingCat] add message and since 2ff7319 [CodingCat] deprecate toArray in RDD Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9032f7c0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9032f7c0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9032f7c0 Branch: refs/heads/master Commit: 9032f7c0d5f1ae7985a20d54ca04c297201aae85 Parents: b8afe30 Author: CodingCat zhunans...@gmail.com Authored: Wed Mar 12 17:43:12 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Wed Mar 12 17:43:12 2014 -0700 -- .../src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 1 + .../src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 1 + core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala | 4 ++-- .../src/main/scala/org/apache/spark/examples/SparkALS.scala| 4 ++-- .../main/scala/org/apache/spark/examples/mllib/SparkSVD.scala | 2 +- mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala | 4 ++-- .../test/scala/org/apache/spark/mllib/linalg/SVDSuite.scala| 6 +++--- 8 files changed, 13 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/9032f7c0/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala -- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index a89419b..3df68d4 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -283,6 +283,7 @@ trait JavaRDDLike[T, This : JavaRDDLike[T, This]] extends Serializable { /** * Return an array that contains all of the elements in this RDD. */ + @deprecated(use collect, 1.0.0) def toArray(): JList[T] = collect() /** http://git-wip-us.apache.org/repos/asf/spark/blob/9032f7c0/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 2384c8f..b20ed99 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -423,7 +423,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Return the key-value pairs in this RDD to the master as a Map. */ def collectAsMap(): Map[K, V] = { -val data = self.toArray() +val data = self.collect() val map = new mutable.HashMap[K, V] map.sizeHint(data.length) data.foreach { case (k, v) = map.put(k, v) } http://git-wip-us.apache.org/repos/asf/spark/blob/9032f7c0/core/src/main/scala/org/apache/spark/rdd/RDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4afa752..b50c996 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -658,6 +658,7 @@ abstract class RDD[T: ClassTag]( /** * Return an array that contains all of the elements in this RDD. */ + @deprecated(use collect, 1.0.0) def toArray(): Array[T] = collect() /** http://git-wip-us.apache.org/repos/asf/spark/blob/9032f7c0/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala -- diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala index b50307c..4ceea55 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala @@ -26,13 +26,13 @@ import cern.jet.random.engine.DRand import org.apache.spark.{Partition, TaskContext} -@deprecated(Replaced by