grundprinzip commented on code in PR #39695:
URL: https://github.com/apache/spark/pull/39695#discussion_r1084080895
##########
python/pyspark/sql/connect/client.py:
##########
@@ -636,6 +671,139 @@ def _handle_error(self, rpc_error: grpc.RpcError) ->
NoReturn:
raise SparkConnectGrpcException(str(rpc_error)) from None
+class RetryState:
+ """
+ Simple state helper that captures the state between retries of the
exceptions. It
+ keeps track of the last exception thrown and how many in total. when the
task
+ finishes successfully done() returns True.
+ """
+
+ def __init__(self) -> None:
+ self._exception: Optional[BaseException] = None
+ self._done = False
+ self._count = 0
+
+ def set_exception(self, exc: Optional[BaseException]) -> None:
+ self._exception = exc
+ self._count += 1
+
+ def exception(self) -> Optional[BaseException]:
+ return self._exception
+
+ def set_done(self) -> None:
+ self._done = True
+
+ def count(self) -> int:
+ return self._count
+
+ def done(self) -> bool:
+ return self._done
+
+
+class AttemptManager:
+ """
+ Simple ContextManager that is used to capture the exception thrown inside
the context.
+ """
+
+ def __init__(self, check: Callable[..., bool], retry_state: RetryState) ->
None:
+ self._retry_state = retry_state
+ self._can_retry = check
+
+ def __enter__(self) -> None:
+ pass
+
+ def __exit__(
+ self,
+ exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ exc_tb: Optional[TracebackType],
+ ) -> Optional[bool]:
+ if isinstance(exc_val, BaseException):
+ # Swallow the exception.
+ if self._can_retry(exc_val):
+ self._retry_state.set_exception(exc_val)
+ return True
+ # Bubble up the exception.
+ return False
+ else:
+ self._retry_state.set_done()
+ return None
+
+
+class Retrying:
+ """
+ This helper class is used as a generator together with a context manager to
+ allow retrying exceptions in particular code blocks. The Retrying can be
configured
+ with a lambda function that is can be filtered what kind of exceptions
should be
+ retried.
+
+ In addition, there are several parameters that are used to configure the
exponential
+ backoff behavior.
+
+ An example to use this class looks like this:
+
+ .. code-block:: python
+
+ for attempt in Retrying(lambda x: isinstance(x, TransientError)):
+ with attempt:
+ # do the work.
+
+ """
+
+ def __init__(
+ self,
+ can_retry: Callable[..., bool] = lambda x: True,
+ max_retries: int = 15,
+ initial_backoff: int = 50,
+ max_backoff: int = 60000,
+ backoff_multiplier: float = 4.0,
+ ) -> None:
+ self._can_retry = can_retry
+ self._max_retries = max_retries
+ self._initial_backoff = initial_backoff
+ self._max_backoff = max_backoff
+ self._backoff_multiplier = backoff_multiplier
+
+ def __iter__(self) -> Generator[AttemptManager, None, None]:
Review Comment:
This is a hack around the python behavior. To be able to retry a block of
code you need to wrap it with a generator because the context maanger cannot
yield multiple times.
So the context manager provides the yielding of the state and the ability to
capture the exception while the generator provides the actual retries.
I took the inspiration from here https://tenacity.readthedocs.io/en/latest/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]