juliuszsompolski commented on code in PR #42399:
URL: https://github.com/apache/spark/pull/42399#discussion_r1291187602


##########
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -148,37 +151,58 @@ private[client] object GrpcRetryHandler extends Logging {
 
   /**
    * Retries the given function with exponential backoff according to the 
client's retryPolicy.
+   *
    * @param retryPolicy
    *   The retry policy
+   * @param sleep
+   *   The function which sleeps (takes number of milliseconds to sleep)
    * @param fn
    *   The function to retry.
-   * @param currentRetryNum
-   *   Current number of retries.
    * @tparam T
    *   The return type of the function.
    * @return
    *   The result of the function.
    */
-  @tailrec final def retry[T](retryPolicy: RetryPolicy)(fn: => T, 
currentRetryNum: Int = 0): T = {
-    if (currentRetryNum > retryPolicy.maxRetries) {
-      throw new IllegalArgumentException(
-        s"The number of retries ($currentRetryNum) must not exceed " +
-          s"the maximum number of retires (${retryPolicy.maxRetries}).")
-    }
-    try {
-      return fn
-    } catch {
-      case NonFatal(e)
-          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
-            && currentRetryNum < retryPolicy.maxRetries =>
-        logWarning(
-          s"Non fatal error during RPC execution: $e, " +
-            s"retrying (currentRetryNum=$currentRetryNum)")
-        Thread.sleep(
-          (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-            .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
+  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = 
Thread.sleep)(
+      fn: => T): T = {
+    var currentRetryNum = 0
+    var lastException: Throwable = null
+    var nextBackoff: Duration = retryPolicy.initialBackoff
+
+    while (currentRetryNum <= retryPolicy.maxRetries) {
+      if (currentRetryNum != 0) {
+        var currentBackoff = nextBackoff
+        if (currentBackoff >= retryPolicy.minJitterThreshold) {
+          currentBackoff += Random.nextDouble() * retryPolicy.jitter
+        }
+        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min 
retryPolicy.maxBackoff
+
+        sleep(currentBackoff.toMillis)
+      }
+
+      try {
+        return fn
+      } catch {
+        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < 
retryPolicy.maxRetries =>
+          currentRetryNum += 1
+          lastException = e
+
+          if (currentRetryNum <= retryPolicy.maxRetries) {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"retrying (currentRetryNum=$currentRetryNum)")
+          } else {
+            logWarning(
+              s"Non-Fatal error during RPC execution: $e, " +
+                s"exceeded retries (currentRetryNum=$currentRetryNum)")
+          }
+      }
     }
-    retry(retryPolicy)(fn, currentRetryNum + 1)
+
+    throw new SparkException(
+      errorClass = "EXCEED_RETRY",

Review Comment:
   @cdkrot I looked again at the python code
   
https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/client/core.py#L1638C1-L1647C22
   ```
               # If the number of retries have exceeded the maximum allowed 
retries.
               if retry_state.count() > self._max_retries:
                   e = retry_state.exception()
                   if e is not None:
                       raise e
                   else:
                       raise PySparkRuntimeError(
                           error_class="EXCEED_RETRY",
                           message_parameters={},
                       )
   ```
   it seems that if there was an exception, it just rethrows the exception. It 
throws the EXCEED_RETRY exception only if the number of retries has been 
exceeded, but there is no exception saved in the retry state 
(`retry_state.exception()`).  I don't think that situation is even possible, 
because in 
https://github.com/apache/spark/blob/master/python/pyspark/sql/connect/client/core.py#L1578
 we retry only ever if there is an exception. So the situation throwing this 
EXCEED_RETRY error in python is only occuring if the code is faulty with 
invalid use of RetryState.set_exception, and it is an internal error. And then 
the message "Retries exceeded but no exception caught." actually fits the 
situation.
   @HyukjinKwon @itholic should this be its error class in python if it's an 
internal should-never-happen assertion?
   
   So I think this error class isn't needed in scala (there is always an 
underlying exception available to throw at this place in the code), and we 
should just rethrow the original exception like python does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to