MonkeyCanCode commented on code in PR #53078:
URL: https://github.com/apache/spark/pull/53078#discussion_r2590565533
##########
sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala:
##########
@@ -110,6 +110,10 @@ private[sql] class SessionState(
def catalogManager: CatalogManager = analyzer.catalogManager
+ override def close(): Unit = {
+ catalogManager.close()
Review Comment:
Thanks for the review and feedback. I just want to make sure I fully
understand the requirement here. When you mention this, are you thinking of the
`spark.newSession()` case? My main concern is avoid issues where cloning one
session could affected a cloned session as now the catalogs may soon be
closable as well
(https://github.com/apache/iceberg/pull/14590/files#diff-e263afc81b5b90944281c544005466e8671e7756f52b5b59b67a77e7c63aaf13).
Do you mind point me in the right direction for the implementation you have
in mind?
Again, thanks so much for your review and time.
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala:
##########
@@ -137,13 +137,16 @@ private[sql] class SparkConnectListenerBusListener(
}
def sendResultComplete(): Unit = {
- responseObserver
- .asInstanceOf[ExecuteResponseObserver[ExecutePlanResponse]]
- .onNextComplete(
- ExecutePlanResponse
- .newBuilder()
-
.setResultComplete(ExecutePlanResponse.ResultComplete.newBuilder().build())
- .build())
+ responseObserver match {
Review Comment:
This was added due to failed unit tests with mock object where i was getting
class cast exception earlier (sample error can be found in
https://github.com/MonkeyCanCode/spark/actions/runs/19394233457/job/55492018508).
Here is the snippet:
```
[info] org.apache.spark.sql.connect.pipelines.EndToEndAPISuite *** ABORTED
*** (81 milliseconds)
[info] java.lang.ClassCastException: class
io.grpc.stub.StreamObserver$MockitoMock$lemjBfIA cannot be cast to class
org.apache.spark.sql.connect.execution.ExecuteResponseObserver
(io.grpc.stub.StreamObserver$MockitoMock$lemjBfIA and
org.apache.spark.sql.connect.execution.ExecuteResponseObserver are in unnamed
module of loader 'app')
[info] at
org.apache.spark.sql.connect.service.SparkConnectListenerBusListener.sendResultComplete(SparkConnectListenerBusListener.scala:140)
[info] at
org.apache.spark.sql.connect.service.ServerSideListenerHolder.cleanUp(SparkConnectListenerBusListener.scala:84)
[info] at
org.apache.spark.sql.connect.service.SessionHolder.close(SessionHolder.scala:370)
[info] at
org.apache.spark.sql.connect.service.SparkConnectSessionManager.shutdownSessionHolder(SparkConnectSessionManager.scala:232)
[info] at
org.apache.spark.sql.connect.service.SparkConnectSessionManager.$anonfun$invalidateAllSessions$1(SparkConnectSessionManager.scala:369)
[info] at
java.base/java.util.concurrent.ConcurrentHashMap.forEach(ConcurrentHashMap.java:1603)
[info] at
org.apache.spark.sql.connect.service.SparkConnectSessionManager.invalidateAllSessions(SparkConnectSessionManager.scala:367)
[info] at
org.apache.spark.sql.connect.SparkConnectServerTest.clearAllExecutions(SparkConnectServerTest.scala:83)
[info] at
org.apache.spark.sql.connect.SparkConnectServerTest.clearAllExecutions$(SparkConnectServerTest.scala:80)
[info] at
org.apache.spark.sql.connect.pipelines.EndToEndAPISuite.clearAllExecutions(EndToEndAPISuite.scala:37)
[info] at
org.apache.spark.sql.connect.SparkConnectServerTest.beforeEach(SparkConnectServerTest.scala:72)
[info] at
org.apache.spark.sql.connect.SparkConnectServerTest.beforeEach$(SparkConnectServerTest.scala:70)
[info] at
org.apache.spark.sql.connect.pipelines.EndToEndAPISuite.beforeEach(EndToEndAPISuite.scala:37)
[info] at
org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:233)
[info] at
org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
[info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:68)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
[info] at scala.collection.immutable.List.foreach(List.scala:323)
[info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
[info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
[info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
[info] at
org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
[info] at org.scalatest.Suite.run(Suite.scala:1114)
[info] at org.scalatest.Suite.run$(Suite.scala:1096)
[info] at
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
[info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
[info] at
org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
[info] at
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:68)
[info] at
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
[info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
[info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
[info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:68)
[info] at
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
[info] at
org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
[info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
[info] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
```
Before this PR, the session cleanup process in the tests was less explicit.
Now with proper resources closed, the generic test `StreamObserver` is causing
issue during casting with existed code.
##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectTestUtils.scala:
##########
@@ -31,6 +31,9 @@ object SparkConnectTestUtils {
sessionId = UUID.randomUUID().toString,
session = session)
SparkConnectService.sessionManager.putSessionForTesting(ret)
+ if (session != null) {
+ ret.initializeSession()
Review Comment:
Earlier tests create dummy session that are not yet fully initialized. Now
as all sessions got properly closed, calling closed on not initialized sessions
was causing failed tests as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]