juliuszsompolski commented on code in PR #43745:
URL: https://github.com/apache/spark/pull/43745#discussion_r1402204421
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##########
@@ -250,15 +250,18 @@ class SparkSession private[sql] (
.setSql(sqlText)
.addAllPosArguments(args.map(lit(_).expr).toImmutableArraySeq.asJava)))
val plan = proto.Plan.newBuilder().setCommand(cmd)
- // .toBuffer forces that the iterator is consumed and closed
- val responseSeq = client.execute(plan.build()).toBuffer.toSeq
+ val responseIter = client.execute(plan.build())
- val response = responseSeq
- .find(_.hasSqlCommandResult)
- .getOrElse(throw new RuntimeException("SQLCommandResult must be
present"))
-
- // Update the builder with the values from the result.
- builder.mergeFrom(response.getSqlCommandResult.getRelation)
+ try {
+ val response = responseIter
+ .find(_.hasSqlCommandResult)
+ .getOrElse(throw new RuntimeException("SQLCommandResult must be
present"))
+ // Update the builder with the values from the result.
+ builder.mergeFrom(response.getSqlCommandResult.getRelation)
+ } finally {
+ // consume the rest of the iterator
+ responseIter.foreach(_ => ())
Review Comment:
I did think about it while reviewing this PR :-).
`responseIterator.close()` would also potentially interrupt the execution if
it was still running. While we have already gotten what we wanted, it's
possible that on the server side ExecuteThreadRunner was still busy wrapping up
and shutting down. A `close()` that came at that moment, could still result in
the thread being interrupted, and the query ending in a CANCELED state. I.e.
there could be a race between the execution shutting down and the
ReleaseExecute in the `close()` killing it. I think nothing critical would
happen other than the query being considered CANCELED, but I didn't want to
deal with that race, so prefer to consume.
If we consume the iterator to the end, this race is prevented by this block
https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L180-L198,
where the execution is no longer interruptible after it sends the
ResultComplete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]