juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1288523015


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala:
##########
@@ -297,6 +297,6 @@ class ExecutePlanResponseReattachableIterator(
   /**
    * Retries the given function with exponential backoff according to the 
client's retryPolicy.
    */
-  private def retry[T](fn: => T, currentRetryNum: Int = 0): T =
-    GrpcRetryHandler.retry(retryPolicy)(fn, currentRetryNum)
+  private def retry[T](fn: => T): T =
+    GrpcRetryHandler.retry(retryPolicy)(fn)

Review Comment:
   This uses the async stub, which means that releaseExecute gets delegated to 
be executed in the background in a different grpc thread, and the 
StreamObserver is a called back onNext / onError. In this case, we are not 
interested in the onNext at all (it's a fire-and-forget), but we want to retry 
onError, so we need to catch the error and trigger the retry from onError... So 
using RetryStreamObserver also doesn't fit here...
   It could be another utility RetryAsyncOnError added to GrpcRetryHandler, and 
the logic of it could be added to the async CustomSparkConnectStub instead of 
being inlined here, but it does not fit the current retrier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to