juliuszsompolski commented on code in PR #43745:
URL: https://github.com/apache/spark/pull/43745#discussion_r1402204421


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -250,15 +250,18 @@ class SparkSession private[sql] (
           .setSql(sqlText)
           
.addAllPosArguments(args.map(lit(_).expr).toImmutableArraySeq.asJava)))
     val plan = proto.Plan.newBuilder().setCommand(cmd)
-    // .toBuffer forces that the iterator is consumed and closed
-    val responseSeq = client.execute(plan.build()).toBuffer.toSeq
+    val responseIter = client.execute(plan.build())
 
-    val response = responseSeq
-      .find(_.hasSqlCommandResult)
-      .getOrElse(throw new RuntimeException("SQLCommandResult must be 
present"))
-
-    // Update the builder with the values from the result.
-    builder.mergeFrom(response.getSqlCommandResult.getRelation)
+    try {
+      val response = responseIter
+        .find(_.hasSqlCommandResult)
+        .getOrElse(throw new RuntimeException("SQLCommandResult must be 
present"))
+      // Update the builder with the values from the result.
+      builder.mergeFrom(response.getSqlCommandResult.getRelation)
+    } finally {
+      // consume the rest of the iterator
+      responseIter.foreach(_ => ())

Review Comment:
   I did think about it while reviewing this PR :-).
   
   `responseIterator.close()` would also potentially interrupt the execution if 
it was still running. While we have already gotten what we wanted, it's 
possible that on the server side ExecuteThreadRunner was still busy wrapping up 
and shutting down. A `close()` that came at that moment, could still result in 
the thread being interrupted, and the query ending in a CANCELED state. I.e. 
there could be a race between the execution shutting down and the 
ReleaseExecute in the `close()` killing it. I think nothing critical would 
happen other than the query being considered CANCELED, but I didn't want to 
deal with that race, so prefer to consume.
   
   If we consume the iterator to the end, this race is prevented by this block
   
https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L180-L198,
 where the execution is no longer interruptible after it sends the 
ResultComplete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to