[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22770 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r227056168 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -31,15 +32,15 @@ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util.{RedirectThread, Utils} private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) - extends Logging { + extends Logging { self => import PythonWorkerFactory._ // Because forking processes from Java is expensive, we prefer to launch a single Python daemon, // pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon // currently only works on UNIX-based systems now because it uses signals for child management, // so we can also fall back to launching workers, pyspark/worker.py (by default) directly. - val useDaemon = { + private val useDaemon = { --- End diff -- Fixing these since I'm touching a lot of fields in this file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r226515494 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -31,15 +32,15 @@ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util.{RedirectThread, Utils} private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) - extends Logging { + extends Logging { self => import PythonWorkerFactory._ // Because forking processes from Java is expensive, we prefer to launch a single Python daemon, // pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon // currently only works on UNIX-based systems now because it uses signals for child management, // so we can also fall back to launching workers, pyspark/worker.py (by default) directly. - val useDaemon = { + private val useDaemon = { --- End diff -- `daemonModule` and `workerModule` too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r226515461 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -31,15 +32,15 @@ import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util.{RedirectThread, Utils} private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) - extends Logging { + extends Logging { self => import PythonWorkerFactory._ // Because forking processes from Java is expensive, we prefer to launch a single Python daemon, // pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon // currently only works on UNIX-based systems now because it uses signals for child management, // so we can also fall back to launching workers, pyspark/worker.py (by default) directly. - val useDaemon = { + private val useDaemon = { --- End diff -- Why are we fixing this? Looks not directly related to the fix and this is already private class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r226459546 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -278,7 +289,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String override def run() { while (true) { -synchronized { +self.synchronized { --- End diff -- this is fix 1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/22770#discussion_r226459609 --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala --- @@ -163,7 +172,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { val socket = serverSocket.accept() authHelper.authClient(socket) -simpleWorkers.put(socket, worker) +self.synchronized { --- End diff -- this is fix 2. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22770: [SPARK-25771][PYSPARK]Fix improper synchronizatio...
GitHub user zsxwing opened a pull request: https://github.com/apache/spark/pull/22770 [SPARK-25771][PYSPARK]Fix improper synchronization in PythonWorkerFactory ## What changes were proposed in this pull request? Fix the following issues in PythonWorkerFactory - MonitorThread.run uses a wrong lock. - `createSimpleWorker` misses `synchronized` when updating `simpleWorkers`. Other changes are just to improve the code style to make the thread-safe contract clear. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zsxwing/spark pwf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22770 commit 0369de2323640377c0f990bb47ebe112654ca498 Author: Shixiong Zhu Date: 2018-10-18T20:39:04Z fix improper synchronization --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org