juliuszsompolski commented on code in PR #43757:
URL: https://github.com/apache/spark/pull/43757#discussion_r1391157540


##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -151,63 +153,73 @@ private[sql] class GrpcRetryHandler(
 private[sql] object GrpcRetryHandler extends Logging {
 
   /**
-   * Retries the given function with exponential backoff according to the 
client's retryPolicy.
-   *
-   * @param retryPolicy
-   *   The retry policy
+   * Class managing the state of the retrying logic.

Review Comment:
   nit: add "during a single call that is to be retriable."



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the 
event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait 
is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait 
is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())

Review Comment:
   nit: I'd move these to the top of the GrpcRetryHandler object, they are a 
bit lost between internal functions now.



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the 
event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait 
is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait 
is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  class RetryPolicyState(val policy: RetryPolicy) {
+    private var numberAttempts = 0
+    private var nextWait: Duration = policy.initialBackoff
+
+    // return waiting time until next attempt, or None if has exceeded max 
retries
+    def nextAttempt(): Option[Duration] = {
+      if (policy.maxRetries.isDefined && numberAttempts >= 
policy.maxRetries.get) {
+        return None
+      }
+
+      numberAttempts += 1
+
+      var currentWait = nextWait
+      nextWait = nextWait * policy.backoffMultiplier
+      if (policy.maxBackoff.isDefined) {
+        nextWait = nextWait min policy.maxBackoff.get
+      }
+
+      if (currentWait >= policy.minJitterThreshold) {
+        currentWait += Random.nextDouble() * policy.jitter
+      }
+
+      return Some(currentWait)
+    }
+
+    def canRetry(throwable: Throwable): Boolean = policy.canRetry(throwable)
+    def getName: String = policy.getName
+  }
 
   /**
-   * An exception that can be thrown upstream when inside retry and which will 
be retryable
-   * regardless of policy.
+   * An exception that can be thrown upstream when inside retry and which will 
be always retryable
    */
   class RetryException extends Throwable
+
+  /**
+   * Represents an exception which was considered retriable but has exceeded 
retry limits
+   */
+  class RetriesExceeded extends Throwable

Review Comment:
   This is supposed to be used publicly, end user may want to catch this 
exception
   maybe unnest it from the GrpcRetryHandler object to make it "public"?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the 
event of an error.
    */
   case class RetryPolicy(
-      // Please synchronize changes here with Python side:
-      // pyspark/sql/connect/client/core.py
-      //
-      // Note: these constants are selected so that the maximum tolerated wait 
is guaranteed
-      // to be at least 10 minutes
-      maxRetries: Int = 15,
-      initialBackoff: FiniteDuration = FiniteDuration(50, "ms"),
-      maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
-      backoffMultiplier: Double = 4.0,
-      jitter: FiniteDuration = FiniteDuration(500, "ms"),
-      minJitterThreshold: FiniteDuration = FiniteDuration(2, "s"),
-      canRetry: Throwable => Boolean = retryException) {}
+      maxRetries: Option[Int] = None,
+      initialBackoff: FiniteDuration = FiniteDuration(1000, "ms"),
+      maxBackoff: Option[FiniteDuration] = None,
+      backoffMultiplier: Double = 1.0,
+      jitter: FiniteDuration = FiniteDuration(0, "s"),
+      minJitterThreshold: FiniteDuration = FiniteDuration(0, "s"),
+      canRetry: Throwable => Boolean = (_ => false),
+      name: String = this.getClass.getName) {
+
+    def getName: String = name
+    def toState: RetryPolicyState = new RetryPolicyState(this)
+  }
+
+  def defaultPolicy(): RetryPolicy = RetryPolicy(
+    name = "DefaultPolicy",
+    // Please synchronize changes here with Python side:
+    // pyspark/sql/connect/client/core.py
+    //
+    // Note: these constants are selected so that the maximum tolerated wait 
is guaranteed
+    // to be at least 10 minutes
+    maxRetries = Some(15),
+    initialBackoff = FiniteDuration(50, "ms"),
+    maxBackoff = Some(FiniteDuration(1, "min")),
+    backoffMultiplier = 4.0,
+    jitter = FiniteDuration(500, "ms"),
+    minJitterThreshold = FiniteDuration(2, "s"),
+    canRetry = retryException)
+
+  // list of policies to be used by this client
+  def defaultPolicies(): Seq[RetryPolicy] = List(defaultPolicy())
+
+  // represents a state of the specific policy
+  // (how many retries have happened and how much to wait until next one)
+  class RetryPolicyState(val policy: RetryPolicy) {

Review Comment:
   nit: add "during a single call that is to be retriable."
   make it a scaladoc.
   
   Could this be closed down to a `private class`?



##########
connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala:
##########
@@ -254,22 +267,75 @@ private[sql] object GrpcRetryHandler extends Logging {
    *   Function that determines whether a retry is to be performed in the 
event of an error.
    */
   case class RetryPolicy(

Review Comment:
   Discussed offline that maybe it would make sense for RetryPolicy to be 
separated from GrpcRetryHandler, as in separating the mechanism of retries, 
from the policy, which is a configurable API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to