gaogaotiantian commented on code in PR #53016:
URL: https://github.com/apache/spark/pull/53016#discussion_r2532522626
##########
python/pyspark/util.py:
##########
@@ -917,6 +918,59 @@ def default_api_mode() -> str:
return "classic"
+class _FaultHandlerIntegration:
+ def __init__(self) -> None:
+ self._log_path: Optional[str] = None
+ self._log_file: Optional[TextIO] = None
+ self._periodic_dump = False
+
+ def start(self) -> None:
+ self._log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds = os.environ.get(
Review Comment:
I understand this variable is like this originally, probably for some
historical reasons. But now that we are refactoring it, maybe name it according
to PEP8 (personally I think it can also be shorter).
##########
python/pyspark/util.py:
##########
@@ -917,6 +918,59 @@ def default_api_mode() -> str:
return "classic"
+class _FaultHandlerIntegration:
+ def __init__(self) -> None:
+ self._log_path: Optional[str] = None
+ self._log_file: Optional[TextIO] = None
+ self._periodic_dump = False
+
+ def start(self) -> None:
+ self._log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds = os.environ.get(
+ "PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None
+ )
+ if self._log_path:
+ self._log_path = os.path.join(self._log_path, str(os.getpid()))
+ self._log_file = open(self._log_path, "w")
+
+ faulthandler.enable(file=self._log_file)
+
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+ self._periodic_dump = True
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
+ def stop(self) -> None:
+ if self._periodic_dump:
+ faulthandler.cancel_dump_traceback_later()
+ self._periodic_dump = False
+ if self._log_path:
+ faulthandler.disable()
+ if self._log_file:
+ self._log_file.close()
+ self._log_file = None
+ os.remove(self._log_path)
+ self._log_path = None
+
+
+def with_fault_handler(func: Callable) -> Callable:
+ """
+ Registers fault handler for the duration of function execution.
+ After function execution is over the faulthandler registration is cleaned
as well,
+ including any files created for the integration. Not reentrant.
+ """
+
+ @functools.wraps(func)
+ def wrapper(*args: Any, **kwargs: Any) -> Any:
+ fault_handler = _FaultHandlerIntegration()
Review Comment:
I have a concern here about nested calls. I know for all our current
pattern, this is only applied to `main` function (maybe?), but `faulthandler`
itself is kind of a singleton and it's global, the file is purely based on pid,
if we accidentally did a nested decoration, it could be pretty bad.
There could be multiple ways to solve it. If we are not planning to expose
`_FaultHandlerIntegration`, we can simply create a singleton object
`_faulthandler_helper = _FaultHandlerIntegration()` and use that in the actual
API `with_fault_handler`. This way we can check if another one is already
running and either raise an exception or do a noop.
We can also put the decorator as a method of the class too - to put all the
related code together, so it's easier to access everything.
Then all you need is
```python
with_faulthandler = _faulthandler_helper.with_faulthandler
```
to expose the API.
I also think all `faulthandler` should be a single word - just like the
library, and the `fault_handler` here should be renamed to something else
(`_faulthandler_helper` for example). Actually `_FaultHandlerIntegration` could
have a better name as well.
##########
python/pyspark/util.py:
##########
@@ -917,6 +918,59 @@ def default_api_mode() -> str:
return "classic"
+class _FaultHandlerIntegration:
+ def __init__(self) -> None:
+ self._log_path: Optional[str] = None
+ self._log_file: Optional[TextIO] = None
+ self._periodic_dump = False
+
+ def start(self) -> None:
+ self._log_path = os.environ.get("PYTHON_FAULTHANDLER_DIR", None)
+ tracebackDumpIntervalSeconds = os.environ.get(
+ "PYTHON_TRACEBACK_DUMP_INTERVAL_SECONDS", None
+ )
+ if self._log_path:
+ self._log_path = os.path.join(self._log_path, str(os.getpid()))
+ self._log_file = open(self._log_path, "w")
+
+ faulthandler.enable(file=self._log_file)
+
+ if tracebackDumpIntervalSeconds is not None and
int(tracebackDumpIntervalSeconds) > 0:
+ self._periodic_dump = True
+
faulthandler.dump_traceback_later(int(tracebackDumpIntervalSeconds),
repeat=True)
+
+ def stop(self) -> None:
+ if self._periodic_dump:
+ faulthandler.cancel_dump_traceback_later()
Review Comment:
`faulthandler.cancel_dump_traceback_later()` is safe to call (will be a
`noop` if `dump_traceback_later` is not called). I think you can get rid of
`self._periodic_dump`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]