HyukjinKwon commented on a change in pull request #30389:
URL: https://github.com/apache/spark/pull/30389#discussion_r528432296



##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -30,15 +30,18 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.mockito.Mockito.mock
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite}

Review comment:
       ```suggestion
   import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
   ```

##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -30,15 +30,18 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.mockito.Mockito.mock
 

Review comment:
       ```suggestion
   import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
   
   ```

##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -30,15 +30,18 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.io.{LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.mockito.Mockito.mock
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv, 
SparkFunSuite}
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.{HadoopRDD, RDD}
 import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
 import org.apache.spark.util.Utils
 
 class PythonRDDSuite extends SparkFunSuite with LocalSparkContext {
 
+  private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, 
Seq.empty: _*)
+
   var tempDir: File = _

Review comment:
       ```suggestion
   
     var tempDir: File = _
   ```

##########
File path: core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
##########
@@ -34,31 +36,38 @@ import org.apache.spark.util.{ThreadUtils, Utils}
  * handling one batch of data, with authentication and error handling.
  *
  * The socket server can only accept one connection, or close if no connection
- * in 15 seconds.
+ * in configurable amount of seconds (default 15).
  */
 private[spark] abstract class SocketAuthServer[T](
     authHelper: SocketAuthHelper,
-    threadName: String) {
+    threadName: String) extends Logging {
 
   def this(env: SparkEnv, threadName: String) = this(new 
SocketAuthHelper(env.conf), threadName)
   def this(threadName: String) = this(SparkEnv.get, threadName)
 
   private val promise = Promise[T]()
 
   private def startServer(): (Int, String) = {
+    logTrace("Creating listening socket")
     val serverSocket = new ServerSocket(0, 1, 
InetAddress.getByAddress(Array(127, 0, 0, 1)))
-    // Close the socket if no connection in 15 seconds
-    serverSocket.setSoTimeout(15000)
+    // Close the socket if no connection in configured seconds

Review comment:
       ```suggestion
       // Close the socket if no connection in the configured seconds
   ```

##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -76,12 +79,22 @@ class PythonRDDSuite extends SparkFunSuite with 
LocalSparkContext {
   }
 
   test("python server error handling") {
-    val authHelper = new SocketAuthHelper(new SparkConf())
-    val errorServer = new ExceptionPythonServer(authHelper)
-    val client = new Socket(InetAddress.getLoopbackAddress(), errorServer.port)
-    authHelper.authToServer(client)
-    val ex = intercept[Exception] { errorServer.getResult(Duration(1, 
"second")) }
-    assert(ex.getCause().getMessage().contains("exception within 
handleConnection"))
+    val savedSparkEnv = SparkEnv.get
+    try {
+      val conf = new SparkConf()
+      val env = mock(classOf[SparkEnv])
+      doReturn(conf).when(env).conf
+      SparkEnv.set(env)
+
+      val authHelper = new SocketAuthHelper(conf)
+      val errorServer = new ExceptionPythonServer(authHelper)
+      val client = new Socket(InetAddress.getLoopbackAddress(), 
errorServer.port)
+      authHelper.authToServer(client)
+      val ex = intercept[Exception] { errorServer.getResult(Duration(1, 
"second")) }
+      assert(ex.getCause().getMessage().contains("exception within 
handleConnection"))
+    } finally {
+      SparkEnv.set(savedSparkEnv)
+    }
   }

Review comment:
       ```suggestion
       val authHelper = new SocketAuthHelper(new SparkConf())
       val errorServer = new ExceptionPythonServer(authHelper)
       val client = new Socket(InetAddress.getLoopbackAddress(), 
errorServer.port)
       authHelper.authToServer(client)
       val ex = intercept[Exception] { errorServer.getResult(Duration(1, 
"second")) }
       assert(ex.getCause().getMessage().contains("exception within 
handleConnection"))
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to