juliuszsompolski commented on code in PR #43962:
URL: https://github.com/apache/spark/pull/43962#discussion_r1402712036


##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala:
##########
@@ -40,8 +40,7 @@ class ReattachableExecuteSuite extends SparkConnectServerTest 
{
       val reattachableIter = getReattachableIterator(iter)
       val initialInnerIter = reattachableIter.innerIterator
 
-      // open the iterator
-      iter.next()
+      iter.next() // open iterator, guarantees that the RPC reached the server

Review Comment:
   The bunch of opening the iterator is still needed in these tests. It's an 
easy way to assert that the query has actually reached the server already, 
without having to resort to an Eventually check of server state.



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala:
##########
@@ -32,16 +32,23 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
   // were all already in the buffer.
   val BIG_ENOUGH_QUERY = "select * from range(1000000)"
 
+  test("Execute is sent eagerly to the server upon iterator creation") {

Review Comment:
   Test added here explicitly check the behavior change - query is now 
submitted eagerly when iterator is created.



##########
connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/SparkConnectClientSuite.scala:
##########
@@ -482,6 +485,72 @@ class SparkConnectClientSuite extends ConnectFunSuite with 
BeforeAndAfterEach {
     iter.foreach(_ => ())
     assert(reattachableIter.resultComplete)
   }
+
+  test("GRPC stub unary call throws error immediately") {

Review Comment:
   Tests added in this file are checking assumptions under which retrying 
errors operates.



##########
connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala:
##########
@@ -32,16 +32,23 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
   // were all already in the buffer.
   val BIG_ENOUGH_QUERY = "select * from range(1000000)"
 
+  test("Execute is sent eagerly to the server upon iterator creation") {
+    withClient { client =>
+      val query = client.execute(buildPlan(BIG_ENOUGH_QUERY))
+      // just creating the iterator triggers query to be sent to server.
+      Eventually.eventually(timeout(eventuallyTimeout)) {
+        assert(SparkConnectService.executionManager.listExecuteHolders.length 
== 1)
+      }
+      assert(query.hasNext)
+    }
+  }
+
   test("ReleaseSession releases all queries and does not allow more requests 
in the session") {
     withClient { client =>
       val query1 = client.execute(buildPlan(BIG_ENOUGH_QUERY))
       val query2 = client.execute(buildPlan(BIG_ENOUGH_QUERY))
-      val query3 = client.execute(buildPlan("select 1"))
-      // just creating the iterator is lazy, trigger query1 and query2 to be 
sent.
-      query1.hasNext
-      query2.hasNext
       Eventually.eventually(timeout(eventuallyTimeout)) {
-        SparkConnectService.executionManager.listExecuteHolders.length == 2
+        assert(SparkConnectService.executionManager.listExecuteHolders.length 
== 2)

Review Comment:
   The lack of `assert` here was a tiny bug. The `hasNext` had the side effect 
that they actually in itself asserted that the query arrived at the server and 
server started working on it (hasNext can only return when server actually 
returns the first response...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to