cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1389508139


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -149,65 +152,68 @@ private[sql] class GrpcRetryHandler(
 }
 
 private[sql] object GrpcRetryHandler extends Logging {
-
   /**
-   * Retries the given function with exponential backoff according to the 
client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
-   * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
-   * @param fn
-   *   The function to retry.
-   * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   * Class managing the state of the retrying logic.
+   * @param retryPolicies, list of policies to apply (in order)
+   * @param sleep, typically Thread.sleep
+   * @param fn, the function to compute
+   * @tparam T, result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = 
Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of 
retries")
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: 
=> T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicyState] = retryPolicies.map(_.toState)
+    private var result: Option[T] = None
+
+    def canRetry(throwable: Throwable): Boolean = {
+      return policies.exists(p => p.canRetry(throwable))
     }
 
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min 
retryPolicy.maxBackoff
-
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
-
+    def makeAttempt(): Unit = {
       try {
-        return fn
+        result = Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < 
retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
+      }
+    }

Review Comment:
   Ok, I don't mind to change it. Shouldn't really matter for GC though, since 
it's short lived



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to