juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389609233


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the 
client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies list of policies to apply (in order)
+   * @param sleep typically Thread.sleep
+   * @param fn the function to compute
+   * @tparam T result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = 
Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of 
retries")
-    }
-
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min 
retryPolicy.maxBackoff
-
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: 
=> T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicyState] = retryPolicies.map(_.toState)
 
-        sleep(currentBackoff.toMillis)
-      }
+    def canRetry(throwable: Throwable): Boolean = {
+      return policies.exists(p => p.canRetry(throwable))
+    }
 
+    def makeAttempt(): Option[T] = {

Review Comment:
   You could make
   ```
       def makeAttempt(): Option[T] = {
         try {
           Some(fn)
         } catch {
           case NonFatal(e) if canRetry(e) =>
             currentRetryNum += 1
             exceptionList = e +: exceptionList
             None
         }
       }
   ```
   to avoid `return`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to