git commit: SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown
Repository: spark Updated Branches: refs/heads/master 58b32f347 - 476581e8c SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown Tobias noted today on the mailing list: I am trying to use Spark Streaming with Kafka, which works like a charm â except for shutdown. When I run my program with sbt run-main, sbt will never exit, because there are two non-daemon threads left that don't die. I created a minimal example at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala. It starts a StreamingContext and does nothing more than connecting to a Kafka server and printing what it receives. Using the `future Unknown macro: { ... } ` construct, I shut down the StreamingContext after some seconds and then print the difference between the threads at start time and at end time. The output can be found at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1. There are a number of threads remaining that will prevent sbt from exiting. When I replace `KafkaUtils.createStream(...)` with a call that does exactly the same, except that it calls `consumerConnector.shutdown()` in `KafkaReceiver.onStop()` (which it should, IMO), the output is as shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2. Does anyone have any idea what is going on here and why the program doesn't shut down properly? The behavior is the same with both kafka 0.8.0 and 0.8.1.1, by the way. Something similar was noted last year: http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3c1380220041.2428.yahoomail...@web160804.mail.bf1.yahoo.com%3E KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not close the `Executor` it creates. The latter leaves non-daemon threads and can prevent the JVM from shutting down even if streaming is closed properly. Author: Sean Owen so...@cloudera.com Closes #980 from srowen/SPARK-2034 and squashes the following commits: 9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is the shadowing intended? 2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local Executor that is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/476581e8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/476581e8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/476581e8 Branch: refs/heads/master Commit: 476581e8c8ca03a5940c404fee8a06361ff94cb5 Parents: 58b32f3 Author: Sean Owen so...@cloudera.com Authored: Sun Jun 22 01:12:15 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sun Jun 22 01:12:15 2014 -0700 -- .../streaming/kafka/KafkaInputDStream.scala | 55 1 file changed, 33 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/476581e8/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 21443eb..38095e8 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver /** * Input stream that pulls messages from a Kafka Broker. * - * @param kafkaParams Map of kafka configuration paramaters. + * @param kafkaParams Map of kafka configuration parameters. *See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name - numPartitions) to consume. Each partition is consumed * in its own thread. @@ -76,29 +76,31 @@ class KafkaReceiver[ // Connection to Kafka var consumerConnector : ConsumerConnector = null - def onStop() { } + def onStop() { +if (consumerConnector != null) { + consumerConnector.shutdown() +} + } def onStart() { -// In case we are using multiple Threads to handle Kafka Messages -val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo(Starting Kafka Consumer Stream with group: + kafkaParams(group.id)) // Kafka connection properties val props = new Properties() kafkaParams.foreach(param = props.put(param._1, param._2)) +val zkConnect = kafkaParams(zookeeper.connect) // Create the connection to the cluster -logInfo(Connecting to Zookeper:
git commit: SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown
Repository: spark Updated Branches: refs/heads/branch-1.0 4881fc62d - 64316af5a SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown Tobias noted today on the mailing list: I am trying to use Spark Streaming with Kafka, which works like a charm â except for shutdown. When I run my program with sbt run-main, sbt will never exit, because there are two non-daemon threads left that don't die. I created a minimal example at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-kafkadoesntshutdown-scala. It starts a StreamingContext and does nothing more than connecting to a Kafka server and printing what it receives. Using the `future Unknown macro: { ... } ` construct, I shut down the StreamingContext after some seconds and then print the difference between the threads at start time and at end time. The output can be found at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output1. There are a number of threads remaining that will prevent sbt from exiting. When I replace `KafkaUtils.createStream(...)` with a call that does exactly the same, except that it calls `consumerConnector.shutdown()` in `KafkaReceiver.onStop()` (which it should, IMO), the output is as shown at https://gist.github.com/tgpfeiffer/b1e765064e983449c6b6#file-output2. Does anyone have any idea what is going on here and why the program doesn't shut down properly? The behavior is the same with both kafka 0.8.0 and 0.8.1.1, by the way. Something similar was noted last year: http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3c1380220041.2428.yahoomail...@web160804.mail.bf1.yahoo.com%3E KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not close the `Executor` it creates. The latter leaves non-daemon threads and can prevent the JVM from shutting down even if streaming is closed properly. Author: Sean Owen so...@cloudera.com Closes #980 from srowen/SPARK-2034 and squashes the following commits: 9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is the shadowing intended? 2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local Executor that is created so that its threads stop when done; close the Zookeeper client even on exception; fix a few typos; log exceptions that otherwise vanish (cherry picked from commit 476581e8c8ca03a5940c404fee8a06361ff94cb5) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64316af5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64316af5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64316af5 Branch: refs/heads/branch-1.0 Commit: 64316af5a29f77753d3bd9dab7b0b9b4e1dd5592 Parents: 4881fc6 Author: Sean Owen so...@cloudera.com Authored: Sun Jun 22 01:12:15 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sun Jun 22 01:12:26 2014 -0700 -- .../streaming/kafka/KafkaInputDStream.scala | 55 1 file changed, 33 insertions(+), 22 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/64316af5/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala -- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala index 21443eb..38095e8 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala @@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver /** * Input stream that pulls messages from a Kafka Broker. * - * @param kafkaParams Map of kafka configuration paramaters. + * @param kafkaParams Map of kafka configuration parameters. *See: http://kafka.apache.org/configuration.html * @param topics Map of (topic_name - numPartitions) to consume. Each partition is consumed * in its own thread. @@ -76,29 +76,31 @@ class KafkaReceiver[ // Connection to Kafka var consumerConnector : ConsumerConnector = null - def onStop() { } + def onStop() { +if (consumerConnector != null) { + consumerConnector.shutdown() +} + } def onStart() { -// In case we are using multiple Threads to handle Kafka Messages -val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + _)) - logInfo(Starting Kafka Consumer Stream with group: + kafkaParams(group.id)) // Kafka connection properties val props = new Properties() kafkaParams.foreach(param = props.put(param._1, param._2)) +
git commit: [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts.
Repository: spark Updated Branches: refs/heads/branch-1.0 64316af5a - 67bffd3c7 [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts. SPARK-1112: This is a more conservative version of #1132 that doesn't change around the actor system initialization on the executor. Instead we just directly read the current frame size limit from the ActorSystem. SPARK-2156: This uses the same fixe as in #1132. Author: Patrick Wendell pwend...@gmail.com Closes #1172 from pwendell/akka-10-fix and squashes the following commits: d56297e [Patrick Wendell] Set limit in LocalBackend to preserve test expectations 9f5ed19 [Patrick Wendell] [SPARK-1112, 2156] (1.0 edition) Use correct akka frame size and overhead amounts. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/67bffd3c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/67bffd3c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/67bffd3c Branch: refs/heads/branch-1.0 Commit: 67bffd3c7ee8e9e3395e714e470459f09d19e66d Parents: 64316af Author: Patrick Wendell pwend...@gmail.com Authored: Sun Jun 22 19:31:15 2014 -0700 Committer: Aaron Davidson aa...@databricks.com Committed: Sun Jun 22 19:31:15 2014 -0700 -- .../executor/CoarseGrainedExecutorBackend.scala | 8 ++-- .../org/apache/spark/executor/Executor.scala| 8 +++- .../apache/spark/executor/ExecutorBackend.scala | 3 +++ .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../spark/scheduler/local/LocalBackend.scala| 6 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 3 +++ .../apache/spark/MapOutputTrackerSuite.scala| 21 +++- 7 files changed, 33 insertions(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 2279d77..70c1f4c 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, -cores: Int) +cores: Int, +actorSystem: ActorSystem) extends Actor with ExecutorBackend with Logging { @@ -94,6 +95,9 @@ private[spark] class CoarseGrainedExecutorBackend( override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { driver ! StatusUpdate(executorId, taskId, state, data) } + + override def akkaFrameSize() = actorSystem.settings.config.getBytes( +akka.remote.netty.tcp.maximum-frame-size) } private[spark] object CoarseGrainedExecutorBackend { @@ -113,7 +117,7 @@ private[spark] object CoarseGrainedExecutorBackend { val sparkHostPort = hostname + : + boundPort actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, -sparkHostPort, cores), +sparkHostPort, cores, actorSystem), name = Executor) workerUrl.foreach { url = http://git-wip-us.apache.org/repos/asf/spark/blob/67bffd3c/core/src/main/scala/org/apache/spark/executor/Executor.scala -- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a2..214a8c8 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -97,10 +97,6 @@ private[spark] class Executor( private val urlClassLoader = createClassLoader() private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader) - // Akka's message frame size. If task result is bigger than this, we use the block manager - // to send the result back. - private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) - // Start worker thread pool val threadPool = Utils.newDaemonCachedThreadPool(Executor task launch worker) @@ -211,8 +207,10 @@ private[spark] class Executor( task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) logInfo(Serialized size of result for + taskId + is + serializedDirectResult.limit) + val serializedResult = { - if (serializedDirectResult.limit = akkaFrameSize - 1024) { + if (serializedDirectResult.limit = execBackend.akkaFrameSize() - +
git commit: SPARK-2241: quote command line args in ec2 script
Repository: spark Updated Branches: refs/heads/branch-1.0 67bffd3c7 - dedd70903 SPARK-2241: quote command line args in ec2 script To preserve quoted command line args (in case options have space in them). Author: Ori Kremer ori.kre...@gmail.com Closes #1169 from orikremer/quote_cmd_line_args and squashes the following commits: 67e2aa1 [Ori Kremer] quote command line args (cherry picked from commit 9fc373e3a9a8ba7bea9df0950775f48918f63a8a) Signed-off-by: Patrick Wendell pwend...@gmail.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dedd7090 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dedd7090 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dedd7090 Branch: refs/heads/branch-1.0 Commit: dedd70903233df78d6b937c1b98ac5f47f7fcf23 Parents: 67bffd3 Author: Ori Kremer ori.kre...@gmail.com Authored: Sun Jun 22 20:21:23 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Sun Jun 22 20:24:08 2014 -0700 -- ec2/spark-ec2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/dedd7090/ec2/spark-ec2 -- diff --git a/ec2/spark-ec2 b/ec2/spark-ec2 index 454057a..31f9771 100755 --- a/ec2/spark-ec2 +++ b/ec2/spark-ec2 @@ -19,4 +19,4 @@ # cd `dirname $0` -PYTHONPATH=./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH python ./spark_ec2.py $@ +PYTHONPATH=./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH python ./spark_ec2.py $@