Github user dbtsai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22085#discussion_r212762076
  
    --- Diff: 
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala ---
    @@ -180,7 +188,73 @@ private[spark] abstract class BasePythonRunner[IN, 
OUT](
             dataOut.writeInt(partitionIndex)
             // Python version of driver
             PythonRDD.writeUTF(pythonVer, dataOut)
    +        // Init a ServerSocket to accept method calls from Python side.
    +        val isBarrier = context.isInstanceOf[BarrierTaskContext]
    +        if (isBarrier) {
    +          serverSocket = Some(new ServerSocket(/* port */ 0,
    +            /* backlog */ 1,
    +            InetAddress.getByName("localhost")))
    +          // A call to accept() for ServerSocket shall block infinitely.
    +          serverSocket.map(_.setSoTimeout(0))
    +          new Thread("accept-connections") {
    +            setDaemon(true)
    +
    +            override def run(): Unit = {
    +              while (!serverSocket.get.isClosed()) {
    +                var sock: Socket = null
    +                try {
    +                  sock = serverSocket.get.accept()
    +                  // Wait for function call from python side.
    +                  sock.setSoTimeout(10000)
    +                  val input = new DataInputStream(sock.getInputStream())
    +                  input.readInt() match {
    +                    case 
BarrierTaskContextMessageProtocol.BARRIER_FUNCTION =>
    +                      // The barrier() function may wait infinitely, 
socket shall not timeout
    +                      // before the function finishes.
    +                      sock.setSoTimeout(0)
    +                      barrierAndServe(sock)
    +
    +                    case _ =>
    +                      val out = new DataOutputStream(new 
BufferedOutputStream(
    +                        sock.getOutputStream))
    +                      
writeUTF(BarrierTaskContextMessageProtocol.ERROR_UNRECOGNIZED_FUNCTION, out)
    +                  }
    +                } catch {
    +                  case e: SocketException if e.getMessage.contains("Socket 
closed") =>
    +                    // It is possible that the ServerSocket is not closed, 
but the native socket
    +                    // has already been closed, we shall catch and 
silently ignore this case.
    +                } finally {
    +                  if (sock != null) {
    +                    sock.close()
    +                  }
    +                }
    +              }
    +            }
    +          }.start()
    +        }
    +        val secret = if (isBarrier) {
    +          authHelper.secret
    +        } else {
    +          ""
    +        }
    +        // Close ServerSocket on task completion.
    +        serverSocket.foreach { server =>
    +          context.addTaskCompletionListener(_ => server.close())
    --- End diff --
    
    Addressed in https://github.com/apache/spark/pull/22229


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to