juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288523015
##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
/**
* Retries the given function with exponential backoff according to the
client's retryPolicy.
*/
- private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
- GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+ private def retry[T](fn: => T): T =
+ GrpcRetryHandler.retry(retryPolicy)(fn)
Review Comment:
This uses the async stub, which means that releaseExecute gets delegated to
be executed in the background in a different grpc thread, and the
StreamObserver is a called back onNext / onError. In this case, we are not
interested in the onNext at all (it's a fire-and-forget), but we want to retry
onError, so we need to catch the error and trigger the retry from onError... So
using RetryStreamObserver also doesn't fit here...
It could be another utility RetryAsyncOnError added to GrpcRetryHandler, and
the logic of it could be added to the async CustomSparkConnectStub instead of
being inlined here, but it does not fit the current retrier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]