Repository: spark
Updated Branches:
  refs/heads/master 9fd7a2f02 -> 962aac4db


[SPARK-12513][STREAMING] SocketReceiver hang in Netcat example

Explicitly close client side socket connection before restart socket receiver.

Author: guoxu1231 <guoxu1...@gmail.com>
Author: Shawn Guo <guoxu1...@gmail.com>

Closes #10464 from guoxu1231/SPARK-12513.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/962aac4d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/962aac4d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/962aac4d

Branch: refs/heads/master
Commit: 962aac4db99f3988c07ccb23439327c18ec178f1
Parents: 9fd7a2f
Author: guoxu1231 <guoxu1...@gmail.com>
Authored: Mon Jan 4 14:23:07 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Jan 4 14:23:07 2016 +0000

----------------------------------------------------------------------
 .../streaming/dstream/SocketInputDStream.scala  | 38 ++++++++++++--------
 1 file changed, 24 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/962aac4d/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
index 10644b9..e70fc87 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/SocketInputDStream.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.streaming.dstream
 
 import java.io._
-import java.net.{Socket, UnknownHostException}
+import java.net.{ConnectException, Socket}
 
 import scala.reflect.ClassTag
 import scala.util.control.NonFatal
@@ -51,7 +51,20 @@ class SocketReceiver[T: ClassTag](
     storageLevel: StorageLevel
   ) extends Receiver[T](storageLevel) with Logging {
 
+  private var socket: Socket = _
+
   def onStart() {
+
+    logInfo(s"Connecting to $host:$port")
+    try {
+      socket = new Socket(host, port)
+    } catch {
+      case e: ConnectException =>
+        restart(s"Error connecting to $host:$port", e)
+        return
+    }
+    logInfo(s"Connected to $host:$port")
+
     // Start the thread that receives data over a connection
     new Thread("Socket Receiver") {
       setDaemon(true)
@@ -60,20 +73,22 @@ class SocketReceiver[T: ClassTag](
   }
 
   def onStop() {
-    // There is nothing much to do as the thread calling receive()
-    // is designed to stop by itself isStopped() returns false
+    // in case restart thread close it twice
+    synchronized {
+      if (socket != null) {
+        socket.close()
+        socket = null
+        logInfo(s"Closed socket to $host:$port")
+      }
+    }
   }
 
   /** Create a socket connection and receive data until receiver is stopped */
   def receive() {
-    var socket: Socket = null
     try {
-      logInfo("Connecting to " + host + ":" + port)
-      socket = new Socket(host, port)
-      logInfo("Connected to " + host + ":" + port)
       val iterator = bytesToObjects(socket.getInputStream())
       while(!isStopped && iterator.hasNext) {
-        store(iterator.next)
+        store(iterator.next())
       }
       if (!isStopped()) {
         restart("Socket data stream had no more data")
@@ -81,16 +96,11 @@ class SocketReceiver[T: ClassTag](
         logInfo("Stopped receiving")
       }
     } catch {
-      case e: java.net.ConnectException =>
-        restart("Error connecting to " + host + ":" + port, e)
       case NonFatal(e) =>
         logWarning("Error receiving data", e)
         restart("Error receiving data", e)
     } finally {
-      if (socket != null) {
-        socket.close()
-        logInfo("Closed socket to " + host + ":" + port)
-      }
+      onStop()
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to