HyukjinKwon commented on code in PR #39695:
URL: https://github.com/apache/spark/pull/39695#discussion_r1089710485


##########
python/pyspark/sql/connect/client.py:
##########
@@ -636,6 +671,139 @@ def _handle_error(self, rpc_error: grpc.RpcError) -> 
NoReturn:
             raise SparkConnectGrpcException(str(rpc_error)) from None
 
 
+class RetryState:
+    """
+    Simple state helper that captures the state between retries of the 
exceptions. It
+    keeps track of the last exception thrown and how many in total. when the 
task
+    finishes successfully done() returns True.
+    """
+
+    def __init__(self) -> None:
+        self._exception: Optional[BaseException] = None
+        self._done = False
+        self._count = 0
+
+    def set_exception(self, exc: Optional[BaseException]) -> None:
+        self._exception = exc
+        self._count += 1
+
+    def exception(self) -> Optional[BaseException]:
+        return self._exception
+
+    def set_done(self) -> None:
+        self._done = True
+
+    def count(self) -> int:
+        return self._count
+
+    def done(self) -> bool:
+        return self._done
+
+
+class AttemptManager:
+    """
+    Simple ContextManager that is used to capture the exception thrown inside 
the context.
+    """
+
+    def __init__(self, check: Callable[..., bool], retry_state: RetryState) -> 
None:
+        self._retry_state = retry_state
+        self._can_retry = check
+
+    def __enter__(self) -> None:
+        pass
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> Optional[bool]:
+        if isinstance(exc_val, BaseException):
+            # Swallow the exception.
+            if self._can_retry(exc_val):
+                self._retry_state.set_exception(exc_val)
+                return True
+            # Bubble up the exception.
+            return False
+        else:
+            self._retry_state.set_done()
+            return None
+
+
+class Retrying:
+    """
+    This helper class is used as a generator together with a context manager to
+    allow retrying exceptions in particular code blocks. The Retrying can be 
configured
+    with a lambda function that is can be filtered what kind of exceptions 
should be
+    retried.
+
+    In addition, there are several parameters that are used to configure the 
exponential
+    backoff behavior.
+
+    An example to use this class looks like this:
+
+    .. code-block:: python
+
+        for attempt in Retrying(lambda x: isinstance(x, TransientError)):
+            with attempt:
+                # do the work.
+
+    """
+
+    def __init__(
+        self,
+        can_retry: Callable[..., bool] = lambda x: True,
+        max_retries: int = 15,
+        initial_backoff: int = 50,
+        max_backoff: int = 60000,
+        backoff_multiplier: float = 4.0,
+    ) -> None:
+        self._can_retry = can_retry
+        self._max_retries = max_retries
+        self._initial_backoff = initial_backoff
+        self._max_backoff = max_backoff
+        self._backoff_multiplier = backoff_multiplier
+
+    def __iter__(self) -> Generator[AttemptManager, None, None]:

Review Comment:
   The example you provided (https://tenacity.readthedocs.io/en/latest/) it 
uses decorator which makes a lot of sense. But with generantor and context 
manager, I don't feel it's easier to follow since there's nothing to do with 
`attempt`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to