HyukjinKwon commented on a change in pull request #30389:
URL: https://github.com/apache/spark/pull/30389#discussion_r528432296
##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -30,15 +30,18 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.mockito.Mockito.mock
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv,
SparkFunSuite}
Review comment:
```suggestion
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
```
##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -30,15 +30,18 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.mockito.Mockito.mock
Review comment:
```suggestion
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
```
##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -30,15 +30,18 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.mockito.Mockito.mock
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkEnv,
SparkFunSuite}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.security.{SocketAuthHelper, SocketAuthServer}
import org.apache.spark.util.Utils
class PythonRDDSuite extends SparkFunSuite with LocalSparkContext {
+ private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value,
Seq.empty: _*)
+
var tempDir: File = _
Review comment:
```suggestion
var tempDir: File = _
```
##########
File path: core/src/main/scala/org/apache/spark/security/SocketAuthServer.scala
##########
@@ -34,31 +36,38 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* handling one batch of data, with authentication and error handling.
*
* The socket server can only accept one connection, or close if no connection
- * in 15 seconds.
+ * in configurable amount of seconds (default 15).
*/
private[spark] abstract class SocketAuthServer[T](
authHelper: SocketAuthHelper,
- threadName: String) {
+ threadName: String) extends Logging {
def this(env: SparkEnv, threadName: String) = this(new
SocketAuthHelper(env.conf), threadName)
def this(threadName: String) = this(SparkEnv.get, threadName)
private val promise = Promise[T]()
private def startServer(): (Int, String) = {
+ logTrace("Creating listening socket")
val serverSocket = new ServerSocket(0, 1,
InetAddress.getByAddress(Array(127, 0, 0, 1)))
- // Close the socket if no connection in 15 seconds
- serverSocket.setSoTimeout(15000)
+ // Close the socket if no connection in configured seconds
Review comment:
```suggestion
// Close the socket if no connection in the configured seconds
```
##########
File path: core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
##########
@@ -76,12 +79,22 @@ class PythonRDDSuite extends SparkFunSuite with
LocalSparkContext {
}
test("python server error handling") {
- val authHelper = new SocketAuthHelper(new SparkConf())
- val errorServer = new ExceptionPythonServer(authHelper)
- val client = new Socket(InetAddress.getLoopbackAddress(), errorServer.port)
- authHelper.authToServer(client)
- val ex = intercept[Exception] { errorServer.getResult(Duration(1,
"second")) }
- assert(ex.getCause().getMessage().contains("exception within
handleConnection"))
+ val savedSparkEnv = SparkEnv.get
+ try {
+ val conf = new SparkConf()
+ val env = mock(classOf[SparkEnv])
+ doReturn(conf).when(env).conf
+ SparkEnv.set(env)
+
+ val authHelper = new SocketAuthHelper(conf)
+ val errorServer = new ExceptionPythonServer(authHelper)
+ val client = new Socket(InetAddress.getLoopbackAddress(),
errorServer.port)
+ authHelper.authToServer(client)
+ val ex = intercept[Exception] { errorServer.getResult(Duration(1,
"second")) }
+ assert(ex.getCause().getMessage().contains("exception within
handleConnection"))
+ } finally {
+ SparkEnv.set(savedSparkEnv)
+ }
}
Review comment:
```suggestion
val authHelper = new SocketAuthHelper(new SparkConf())
val errorServer = new ExceptionPythonServer(authHelper)
val client = new Socket(InetAddress.getLoopbackAddress(),
errorServer.port)
authHelper.authToServer(client)
val ex = intercept[Exception] { errorServer.getResult(Duration(1,
"second")) }
assert(ex.getCause().getMessage().contains("exception within
handleConnection"))
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]