This is an automated email from the ASF dual-hosted git repository. ueshin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cf64008fce7 [SPARK-44433][PYTHON][CONNECT][SS][FOLLOWUP] Set back USE_DAEMON after creating streaming python processes cf64008fce7 is described below commit cf64008fce77b38d1237874b04f5ac124b01b3a8 Author: Wei Liu <wei....@databricks.com> AuthorDate: Fri Aug 4 17:41:27 2023 -0700 [SPARK-44433][PYTHON][CONNECT][SS][FOLLOWUP] Set back USE_DAEMON after creating streaming python processes ### What changes were proposed in this pull request? Followup of this comment: https://github.com/apache/spark/pull/42283#discussion_r1283804782 Change back the spark conf after creating streaming python process. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Config only change Closes #42341 from WweiL/SPARK-44433-followup-USEDAEMON. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Takuya UESHIN <ues...@databricks.com> --- .../spark/api/python/StreamingPythonRunner.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index f14289f984a..a079743c847 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -66,14 +66,19 @@ private[spark] class StreamingPythonRunner( envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString) envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString) - conf.set(PYTHON_USE_DAEMON, false) envVars.put("SPARK_CONNECT_LOCAL_URL", connectUrl) - val (worker, _) = env.createPythonWorker( - pythonExec, workerModule, envVars.asScala.toMap) - pythonWorker = Some(worker) + val prevConf = conf.get(PYTHON_USE_DAEMON) + conf.set(PYTHON_USE_DAEMON, false) + try { + val (worker, _) = env.createPythonWorker( + pythonExec, workerModule, envVars.asScala.toMap) + pythonWorker = Some(worker) + } finally { + conf.set(PYTHON_USE_DAEMON, prevConf) + } - val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) + val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) // TODO(SPARK-44461): verify python version @@ -87,7 +92,8 @@ private[spark] class StreamingPythonRunner( dataOut.write(command.toArray) dataOut.flush() - val dataIn = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize)) + val dataIn = new DataInputStream( + new BufferedInputStream(pythonWorker.get.getInputStream, bufferSize)) val resFromPython = dataIn.readInt() logInfo(s"Runner initialization returned $resFromPython") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org