jdesjean commented on code in PR #41443:
URL: https://github.com/apache/spark/pull/41443#discussion_r1265791972
##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala:
##########
@@ -378,4 +634,210 @@ class SparkConnectServiceSuite extends SharedSparkSession
{
assert(valuesList.last.hasLong && valuesList.last.getLong == 99)
}
}
+
+ protected def withCommandTest(f: VerifyEvents => Unit): Unit = {
+ withView("testview") {
+ withTable("testcat.testtable") {
+ withSparkConf(
+ "spark.sql.catalog.testcat" ->
classOf[InMemoryPartitionTableCatalog].getName,
+ Connect.CONNECT_EXTENSIONS_COMMAND_CLASSES.key ->
+ "org.apache.spark.sql.connect.plugin.ExampleCommandPlugin") {
+ withEvents { verifyEvents =>
+ val restartedQuery = mock[StreamingQuery]
+ when(restartedQuery.id).thenReturn(DEFAULT_UUID)
+ when(restartedQuery.runId).thenReturn(DEFAULT_UUID)
+
SparkConnectService.streamingSessionManager.registerNewStreamingQuery(
+ SparkConnectService.getOrCreateIsolatedSession("c1", "s1"),
+ restartedQuery)
+ f(verifyEvents)
+ }
+ }
+ }
+ }
+ }
+
+ protected def withSparkConf(pairs: (String, String)*)(f: => Unit): Unit = {
+ val conf = SparkEnv.get.conf
+ pairs.foreach { kv => conf.set(kv._1, kv._2) }
+ try f
+ finally {
+ pairs.foreach { kv => conf.remove(kv._1) }
+ }
+ }
+
+ protected def withEvents(f: VerifyEvents => Unit): Unit = {
+ val verifyEvents = new VerifyEvents(spark.sparkContext)
+ spark.sparkContext.addSparkListener(verifyEvents.listener)
+ Utils.tryWithSafeFinally({
+ f(verifyEvents)
+ SparkConnectService.invalidateAllSessions()
+ verifyEvents.onSessionClosed()
+ }) {
+ verifyEvents.waitUntilEmpty()
+ spark.sparkContext.removeSparkListener(verifyEvents.listener)
+ SparkConnectService.invalidateAllSessions()
+ SparkConnectPluginRegistry.reset()
+ }
+ }
+
+ protected def gridTest[A](testNamePrefix: String, testTags: Tag*)(params:
Seq[A])(
+ testFun: A => Unit): Unit = {
+ for (param <- params) {
+ test(testNamePrefix + s" ($param)", testTags: _*)(testFun(param))
+ }
+ }
+
+ sealed abstract class Status(value: Int)
+
+ object Status {
+ case object Pending extends Status(0)
+ case object SessionStarted extends Status(1)
+ case object Started extends Status(2)
+ case object Analyzed extends Status(3)
+ case object ReadyForExecution extends Status(4)
+ case object Finished extends Status(5)
+ case object Failed extends Status(6)
+ case object Canceled extends Status(7)
+ case object Closed extends Status(8)
+ case object SessionClosed extends Status(9)
+ }
Review Comment:
Moved assertions to EventsManager
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]