juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392854703


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -151,125 +150,72 @@ private[sql] class GrpcRetryHandler(
 private[sql] object GrpcRetryHandler extends Logging {
 
   /**
-   * Retries the given function with exponential backoff according to the 
client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
+   * Class managing the state of the retrying logic during a single retryable 
block.
+   * @param retryPolicies
+   *   list of policies to apply (in order)
    * @param sleep
-   *   The function which sleeps (takes number of milliseconds to sleep)
+   *   typically Thread.sleep
    * @param fn
-   *   The function to retry.
+   *   the function to compute
    * @tparam T
-   *   The return type of the function.
-   * @return
-   *   The result of the function.
+   *   result of function fn
    */
-  final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit = 
Thread.sleep)(
-      fn: => T): T = {
-    var currentRetryNum = 0
-    var exceptionList: Seq[Throwable] = Seq.empty
-    var nextBackoff: Duration = retryPolicy.initialBackoff
-
-    if (retryPolicy.maxRetries < 0) {
-      throw new IllegalArgumentException("Can't have negative number of 
retries")
-    }
-
-    while (currentRetryNum <= retryPolicy.maxRetries) {
-      if (currentRetryNum != 0) {
-        var currentBackoff = nextBackoff
-        nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min 
retryPolicy.maxBackoff
+  class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn: 
=> T) {
+    private var currentRetryNum: Int = 0
+    private var exceptionList: Seq[Throwable] = Seq.empty
+    private val policies: Seq[RetryPolicy.RetryPolicyState] = 
retryPolicies.map(_.toState)
 
-        if (currentBackoff >= retryPolicy.minJitterThreshold) {
-          currentBackoff += Random.nextDouble() * retryPolicy.jitter
-        }
-
-        sleep(currentBackoff.toMillis)
-      }
+    def canRetry(throwable: Throwable): Boolean = {
+      policies.exists(p => p.canRetry(throwable))
+    }
 
+    def makeAttempt(): Option[T] = {
       try {
-        return fn
+        Some(fn)
       } catch {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < 
retryPolicy.maxRetries =>
+        case NonFatal(e) if canRetry(e) =>
           currentRetryNum += 1
           exceptionList = e +: exceptionList
-
-          if (currentRetryNum <= retryPolicy.maxRetries) {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"retrying (currentRetryNum=$currentRetryNum)")
-          } else {
-            logWarning(
-              s"Non-Fatal error during RPC execution: $e, " +
-                s"exceeded retries (currentRetryNum=$currentRetryNum)")
-          }
+          None
       }
     }
 
-    val exception = exceptionList.head
-    exceptionList.tail.foreach(exception.addSuppressed(_))
-    throw exception
-  }
+    def waitAfterAttempt(): Unit = {
+      // find policy which will accept this exception
+      val lastException = exceptionList.head

Review Comment:
   Thanks. Yeah, when there was just a single retry policy I put it into there, 
and was ok with it having the delay/countinging towards the number of retries. 
But now that we're refactoring it, it would be better to live outside the 
policy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to