git commit: SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown

2014-06-22 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 4881fc62d -> 64316af5a


SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM 
shutdown

Tobias noted today on the mailing list:



I am trying to use Spark Streaming with Kafka, which works like a
charm – except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.
I created a minimal example at
.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future
Unknown macro: { ... }
` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
.
There are a number of threads remaining that will prevent sbt from
exiting.
When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at .
Does anyone have any idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.



Something similar was noted last year:

http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3c1380220041.2428.yahoomail...@web160804.mail.bf1.yahoo.com%3E

KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not 
close the `Executor` it creates. The latter leaves non-daemon threads and can 
prevent the JVM from shutting down even if streaming is closed properly.

Author: Sean Owen 

Closes #980 from srowen/SPARK-2034 and squashes the following commits:

9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is 
the shadowing intended?
2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local 
Executor that is created so that its threads stop when done; close the 
Zookeeper client even on exception; fix a few typos; log exceptions that 
otherwise vanish
(cherry picked from commit 476581e8c8ca03a5940c404fee8a06361ff94cb5)

Signed-off-by: Patrick Wendell 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/64316af5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/64316af5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/64316af5

Branch: refs/heads/branch-1.0
Commit: 64316af5a29f77753d3bd9dab7b0b9b4e1dd5592
Parents: 4881fc6
Author: Sean Owen 
Authored: Sun Jun 22 01:12:15 2014 -0700
Committer: Patrick Wendell 
Committed: Sun Jun 22 01:12:26 2014 -0700

--
 .../streaming/kafka/KafkaInputDStream.scala | 55 
 1 file changed, 33 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/64316af5/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 21443eb..38095e8 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
 /**
  * Input stream that pulls messages from a Kafka Broker.
  *
- * @param kafkaParams Map of kafka configuration paramaters.
+ * @param kafkaParams Map of kafka configuration parameters.
  *See: http://kafka.apache.org/configuration.html
  * @param topics Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
  * in its own thread.
@@ -76,29 +76,31 @@ class KafkaReceiver[
   // Connection to Kafka
   var consumerConnector : ConsumerConnector = null
 
-  def onStop() { }
+  def onStop() {
+if (consumerConnector != null) {
+  consumerConnector.shutdown()
+}
+  }
 
   def onStart() {
 
-// In case we are using multiple Threads to handle Kafka Messages
-val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + 
_))
-
 logInfo("Starting Kafka Consumer Stream with group: " + 
kafkaParams("group.id"))
 
 // Kafka connection properties
 val props = new Properties()
 kafkaParams.foreach(param => props.put(param._1, param._2))
 
+val zkConnect = kafkaParams("zookeeper.connect")
 // C

git commit: SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM shutdown

2014-06-22 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master 58b32f347 -> 476581e8c


SPARK-2034. KafkaInputDStream doesn't close resources and may prevent JVM 
shutdown

Tobias noted today on the mailing list:



I am trying to use Spark Streaming with Kafka, which works like a
charm – except for shutdown. When I run my program with "sbt
run-main", sbt will never exit, because there are two non-daemon
threads left that don't die.
I created a minimal example at
.
It starts a StreamingContext and does nothing more than connecting to
a Kafka server and printing what it receives. Using the `future
Unknown macro: { ... }
` construct, I shut down the StreamingContext after some seconds and
then print the difference between the threads at start time and at end
time. The output can be found at
.
There are a number of threads remaining that will prevent sbt from
exiting.
When I replace `KafkaUtils.createStream(...)` with a call that does
exactly the same, except that it calls `consumerConnector.shutdown()`
in `KafkaReceiver.onStop()` (which it should, IMO), the output is as
shown at .
Does anyone have any idea what is going on here and why the program
doesn't shut down properly? The behavior is the same with both kafka
0.8.0 and 0.8.1.1, by the way.



Something similar was noted last year:

http://mail-archives.apache.org/mod_mbox/spark-dev/201309.mbox/%3c1380220041.2428.yahoomail...@web160804.mail.bf1.yahoo.com%3E

KafkaInputDStream doesn't close `ConsumerConnector` in `onStop()`, and does not 
close the `Executor` it creates. The latter leaves non-daemon threads and can 
prevent the JVM from shutting down even if streaming is closed properly.

Author: Sean Owen 

Closes #980 from srowen/SPARK-2034 and squashes the following commits:

9f31a8d [Sean Owen] Restore ClassTag to private class because MIMA flags it; is 
the shadowing intended?
2d579a8 [Sean Owen] Close ConsumerConnector in onStop; shutdown() the local 
Executor that is created so that its threads stop when done; close the 
Zookeeper client even on exception; fix a few typos; log exceptions that 
otherwise vanish


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/476581e8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/476581e8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/476581e8

Branch: refs/heads/master
Commit: 476581e8c8ca03a5940c404fee8a06361ff94cb5
Parents: 58b32f3
Author: Sean Owen 
Authored: Sun Jun 22 01:12:15 2014 -0700
Committer: Patrick Wendell 
Committed: Sun Jun 22 01:12:15 2014 -0700

--
 .../streaming/kafka/KafkaInputDStream.scala | 55 
 1 file changed, 33 insertions(+), 22 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/476581e8/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
--
diff --git 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
index 21443eb..38095e8 100644
--- 
a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
+++ 
b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala
@@ -38,7 +38,7 @@ import org.apache.spark.streaming.receiver.Receiver
 /**
  * Input stream that pulls messages from a Kafka Broker.
  *
- * @param kafkaParams Map of kafka configuration paramaters.
+ * @param kafkaParams Map of kafka configuration parameters.
  *See: http://kafka.apache.org/configuration.html
  * @param topics Map of (topic_name -> numPartitions) to consume. Each 
partition is consumed
  * in its own thread.
@@ -76,29 +76,31 @@ class KafkaReceiver[
   // Connection to Kafka
   var consumerConnector : ConsumerConnector = null
 
-  def onStop() { }
+  def onStop() {
+if (consumerConnector != null) {
+  consumerConnector.shutdown()
+}
+  }
 
   def onStart() {
 
-// In case we are using multiple Threads to handle Kafka Messages
-val executorPool = Executors.newFixedThreadPool(topics.values.reduce(_ + 
_))
-
 logInfo("Starting Kafka Consumer Stream with group: " + 
kafkaParams("group.id"))
 
 // Kafka connection properties
 val props = new Properties()
 kafkaParams.foreach(param => props.put(param._1, param._2))
 
+val zkConnect = kafkaParams("zookeeper.connect")
 // Create the connection to the cluster
-logInfo("Connecting to Zookeper: " + kafkaParams("zookeeper.connect")