BryanCutler commented on a change in pull request #24834:
[WIP][SPARK-27992][PYTHON] Synchronize with Python connection thread to
propagate errors
URL: https://github.com/apache/spark/pull/24834#discussion_r295999254
##########
File path: core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
##########
@@ -66,42 +87,45 @@ private[spark] abstract class SocketAuthServer[T](
}
+/**
+ * Create a socket server class and run user function on the socket in a
background thread.
+ * This is the same as calling SocketAuthServer.setupOneConnectionServer
except it creates
+ * a server object that can then be synced from Python.
+ */
+private[spark] class SocketFuncServer(
+ authHelper: SocketAuthHelper,
+ threadName: String,
+ func: Socket => Unit) extends SocketAuthServer[Unit](authHelper,
threadName) {
+
+ override def handleConnection(sock: Socket): Unit = {
+ func(sock)
+ }
+}
+
private[spark] object SocketAuthServer {
/**
- * Create a socket server and run user function on the socket in a
background thread.
- *
- * The socket server can only accept one connection, or close if no
connection
- * in 15 seconds.
+ * Convenience function to create a socket server and run a user function in
a background
+ * thread to write to an output stream.
*
- * The thread will terminate after the supplied user function, or if there
are any exceptions.
- *
- * If you need to get a result of the supplied function, create a subclass
of [[SocketAuthServer]]
- *
- * @return The port number of a local socket and the secret for
authentication.
+ * @param threadName Name for the background serving thread.
+ * @param authHelper SocketAuthHelper for authentication
+ * @param writeFunc User function to write to a given OutputStream
+ * @return
*/
- def setupOneConnectionServer(
- authHelper: SocketAuthHelper,
- threadName: String)
- (func: Socket => Unit): (Int, String) = {
- val serverSocket = new ServerSocket(0, 1,
InetAddress.getByAddress(Array(127, 0, 0, 1)))
- // Close the socket if no connection in 15 seconds
- serverSocket.setSoTimeout(15000)
-
- new Thread(threadName) {
- setDaemon(true)
- override def run(): Unit = {
- var sock: Socket = null
- try {
- sock = serverSocket.accept()
- authHelper.authClient(sock)
- func(sock)
- } finally {
- JavaUtils.closeQuietly(serverSocket)
- JavaUtils.closeQuietly(sock)
- }
+ def serveToStream(
Review comment:
Moved this from `SocketAuthHelper` because it seemed more fitting to be here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]