cdkrot commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1392847382
##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -151,125 +150,72 @@ private[sql] class GrpcRetryHandler(
private[sql] object GrpcRetryHandler extends Logging {
/**
- * Retries the given function with exponential backoff according to the
client's retryPolicy.
- *
- * @param retryPolicy
- * The retry policy
+ * Class managing the state of the retrying logic during a single retryable
block.
+ * @param retryPolicies
+ * list of policies to apply (in order)
* @param sleep
- * The function which sleeps (takes number of milliseconds to sleep)
+ * typically Thread.sleep
* @param fn
- * The function to retry.
+ * the function to compute
* @tparam T
- * The return type of the function.
- * @return
- * The result of the function.
+ * result of function fn
*/
- final def retry[T](retryPolicy: RetryPolicy, sleep: Long => Unit =
Thread.sleep)(
- fn: => T): T = {
- var currentRetryNum = 0
- var exceptionList: Seq[Throwable] = Seq.empty
- var nextBackoff: Duration = retryPolicy.initialBackoff
-
- if (retryPolicy.maxRetries < 0) {
- throw new IllegalArgumentException("Can't have negative number of
retries")
- }
-
- while (currentRetryNum <= retryPolicy.maxRetries) {
- if (currentRetryNum != 0) {
- var currentBackoff = nextBackoff
- nextBackoff = nextBackoff * retryPolicy.backoffMultiplier min
retryPolicy.maxBackoff
+ class Retrying[T](retryPolicies: Seq[RetryPolicy], sleep: Long => Unit, fn:
=> T) {
+ private var currentRetryNum: Int = 0
+ private var exceptionList: Seq[Throwable] = Seq.empty
+ private val policies: Seq[RetryPolicy.RetryPolicyState] =
retryPolicies.map(_.toState)
- if (currentBackoff >= retryPolicy.minJitterThreshold) {
- currentBackoff += Random.nextDouble() * retryPolicy.jitter
- }
-
- sleep(currentBackoff.toMillis)
- }
+ def canRetry(throwable: Throwable): Boolean = {
+ policies.exists(p => p.canRetry(throwable))
+ }
+ def makeAttempt(): Option[T] = {
try {
- return fn
+ Some(fn)
} catch {
- case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum <
retryPolicy.maxRetries =>
+ case NonFatal(e) if canRetry(e) =>
currentRetryNum += 1
exceptionList = e +: exceptionList
-
- if (currentRetryNum <= retryPolicy.maxRetries) {
- logWarning(
- s"Non-Fatal error during RPC execution: $e, " +
- s"retrying (currentRetryNum=$currentRetryNum)")
- } else {
- logWarning(
- s"Non-Fatal error during RPC execution: $e, " +
- s"exceeded retries (currentRetryNum=$currentRetryNum)")
- }
+ None
}
}
- val exception = exceptionList.head
- exceptionList.tail.foreach(exception.addSuppressed(_))
- throw exception
- }
+ def waitAfterAttempt(): Unit = {
+ // find policy which will accept this exception
+ val lastException = exceptionList.head
Review Comment:
Ok, didn't realize that RetryException is supposed to be retried immediately
even without policies.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]