rangadi commented on code in PR #42779:
URL: https://github.com/apache/spark/pull/42779#discussion_r1326514203
##########
connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala:
##########
@@ -26,15 +26,13 @@ import org.apache.spark.sql.streaming.StreamingQueryListener
* instance of this class starts a python process, inside which has the python
handling logic.
* When a new event is received, it is serialized to json, and passed to the
python process.
*/
-class PythonStreamingQueryListener(
- listener: SimplePythonFunction,
- sessionHolder: SessionHolder,
- pythonExec: String)
+class PythonStreamingQueryListener(listener: SimplePythonFunction,
sessionHolder: SessionHolder)
extends StreamingQueryListener {
private val port = SparkConnectService.localPort
private val connectUrl =
s"sc://localhost:$port/;user_id=${sessionHolder.userId}"
- private val runner = StreamingPythonRunner(
+ // Scoped for testing
Review Comment:
Nit: _// Visible for testing._
##########
core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala:
##########
@@ -108,10 +108,30 @@ private[spark] class StreamingPythonRunner(
* Stops the Python worker.
*/
def stop(): Unit = {
- pythonWorker.foreach { worker =>
+ logInfo(s"Stopping streaming runner for sessionId: $sessionId, module:
$workerModule.")
+
+ try {
pythonWorkerFactory.foreach { factory =>
- factory.stopWorker(worker)
- factory.stop()
+ pythonWorker.foreach { worker =>
+ factory.stopWorker(worker)
+ factory.stop()
+ }
+ }
+ } catch {
+ case e: Exception =>
+ logError("Exception when trying to kill worker", e)
+ }
+ }
+
+ /**
+ * Returns whether the Python worker has been stopped.
+ * @return Some(true) if the Python worker has been stopped.
Review Comment:
Simpler return `Boolean` and throw if there is no worker started. This is
only used in tests.
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends
SharedSparkSession {
sessionHolder.getDataFrameOrThrow(key1)
}
}
+
+ private def streamingForeachBatchFunction(pysparkPythonPath: String):
Array[Byte] = {
Review Comment:
Add a brief comment about what this does (and/or it purpose).
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHodlerSuite.scala:
##########
@@ -79,4 +92,196 @@ class SparkConnectSessionHolderSuite extends
SharedSparkSession {
sessionHolder.getDataFrameOrThrow(key1)
}
}
+
+ private def streamingForeachBatchFunction(pysparkPythonPath: String):
Array[Byte] = {
+ var binaryFunc: Array[Byte] = null
+ withTempPath { path =>
+ Process(
+ Seq(
+ IntegratedUDFTestUtils.pythonExec,
+ "-c",
+ "from pyspark.serializers import CloudPickleSerializer; " +
+ s"f = open('$path', 'wb');" +
+ "f.write(CloudPickleSerializer().dumps((" +
+ "lambda df, batchId: batchId)))"),
+ None,
+ "PYTHONPATH" -> pysparkPythonPath).!!
+ binaryFunc = Files.readAllBytes(path.toPath)
+ }
+ assert(binaryFunc != null)
+ binaryFunc
+ }
+
+ private def streamingQueryListenerFunction(pysparkPythonPath: String):
Array[Byte] = {
+ var binaryFunc: Array[Byte] = null
+ val pythonScript =
+ """
+ |from pyspark.sql.streaming.listener import StreamingQueryListener
+ |
+ |class MyListener(StreamingQueryListener):
+ | def onQueryStarted(e):
+ | pass
+ |
+ | def onQueryIdle(e):
+ | pass
+ |
+ | def onQueryProgress(e):
+ | pass
+ |
+ | def onQueryTerminated(e):
+ | pass
+ |
+ |listener = MyListener()
+ """.stripMargin
+ withTempPath { codePath =>
+ Files.write(codePath.toPath,
pythonScript.getBytes(StandardCharsets.UTF_8))
+ withTempPath { path =>
+ Process(
+ Seq(
+ IntegratedUDFTestUtils.pythonExec,
+ "-c",
+ "from pyspark.serializers import CloudPickleSerializer; " +
+ s"f = open('$path', 'wb');" +
+ s"exec(open('$codePath', 'r').read());" +
+ "f.write(CloudPickleSerializer().dumps(listener))"),
+ None,
+ "PYTHONPATH" -> pysparkPythonPath).!!
+ binaryFunc = Files.readAllBytes(path.toPath)
+ }
+ }
+ assert(binaryFunc != null)
+ binaryFunc
+ }
+
+ private def dummyPythonFunction(sessionHolder: SessionHolder)(
+ fcn: String => Array[Byte]): SimplePythonFunction = {
+ val sparkPythonPath =
+
s"${IntegratedUDFTestUtils.pysparkPythonPath}:${IntegratedUDFTestUtils.pythonPath}"
+
+ SimplePythonFunction(
+ command = fcn(sparkPythonPath),
+ envVars = mutable.Map("PYTHONPATH" -> sparkPythonPath).asJava,
+ pythonIncludes =
sessionHolder.artifactManager.getSparkConnectPythonIncludes.asJava,
+ pythonExec = IntegratedUDFTestUtils.pythonExec,
+ pythonVer = IntegratedUDFTestUtils.pythonVer,
+ broadcastVars = Lists.newArrayList(),
+ accumulator = null)
+ }
+
+ test("python foreachBatch process: process terminates after query is
stopped") {
+ // scalastyle:off assume
+ assume(IntegratedUDFTestUtils.shouldTestPythonUDFs)
+ // scalastyle:on assume
+
+ val sessionHolder = SessionHolder.forTesting(spark)
+ try {
+ SparkConnectService.start(spark.sparkContext)
+
+ val pythonFn =
dummyPythonFunction(sessionHolder)(streamingForeachBatchFunction)
+ val (fn1, cleaner1) =
+ StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn,
sessionHolder)
+ val (fn2, cleaner2) =
+ StreamingForeachBatchHelper.pythonForeachBatchWrapper(pythonFn,
sessionHolder)
+
+ val query1 = spark.readStream
+ .format("rate")
+ .load()
+ .writeStream
+ .format("memory")
+ .queryName("foreachBatch_termination_test_q1")
+ .foreachBatch(fn1)
+ .start()
+
+ val query2 = spark.readStream
+ .format("rate")
+ .load()
+ .writeStream
+ .format("memory")
+ .queryName("foreachBatch_termination_test_q2")
+ .foreachBatch(fn2)
+ .start()
+
+ sessionHolder.streamingForeachBatchRunnerCleanerCache
Review Comment:
Add comment about why we are accessing internal cleaner cache here.
It wouldn't be clear to me if you hadn't explain in VC. Refer to how this
this mimicking Python foreachBatch API from Scala.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]