Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-30 Thread via GitHub


kaxil merged PR #44465:
URL: https://github.com/apache/airflow/pull/44465


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1864034492


##
task_sdk/tests/execution_time/test_supervisor.py:
##
@@ -430,6 +432,216 @@ def test_heartbeat_failures_handling(self, monkeypatch, 
mocker, captured_logs, t
 } in captured_logs
 
 
+class TestWatchedSubprocessKill:
+@pytest.fixture
+def mock_process(self, mocker):
+process = mocker.Mock(spec=psutil.Process)
+process.pid = 12345
+return process
+
+@pytest.fixture
+def watched_subprocess(self, mocker, mock_process):
+proc = WatchedSubprocess(
+ti_id=TI_ID,
+pid=12345,
+stdin=mocker.Mock(),
+client=mocker.Mock(),
+process=mock_process,
+)
+# Mock the selector
+mock_selector = mocker.Mock(spec=selectors.DefaultSelector)
+mock_selector.select.return_value = []
+
+# Set the selector on the process
+proc.selector = mock_selector
+return proc
+
+@pytest.mark.parametrize(
+["signal_to_send", "wait_side_effect", "expected_signals"],
+[
+pytest.param(
+signal.SIGINT,
+[0],
+[signal.SIGINT],
+id="SIGINT-success-without-escalation",
+),
+pytest.param(
+signal.SIGINT,
+[psutil.TimeoutExpired(0.1), 0],
+[signal.SIGINT, signal.SIGTERM],
+id="SIGINT-escalates-to-SIGTERM",
+),
+pytest.param(
+signal.SIGINT,
+[
+psutil.TimeoutExpired(0.1),  # SIGINT times out
+psutil.TimeoutExpired(0.1),  # SIGTERM times out
+0,  # SIGKILL succeeds
+],
+[signal.SIGINT, signal.SIGTERM, signal.SIGKILL],
+id="SIGINT-escalates-to-SIGTERM-then-SIGKILL",
+),
+pytest.param(
+signal.SIGTERM,
+[
+psutil.TimeoutExpired(0.1),  # SIGTERM times out
+0,  # SIGKILL succeeds
+],
+[signal.SIGTERM, signal.SIGKILL],
+id="SIGTERM-escalates-to-SIGKILL",
+),
+pytest.param(
+signal.SIGKILL,
+[0],
+[signal.SIGKILL],
+id="SIGKILL-success-without-escalation",
+),
+],
+)
+def test_force_kill_escalation(
+self,
+watched_subprocess,
+mock_process,
+mocker,
+signal_to_send,
+wait_side_effect,
+expected_signals,
+captured_logs,
+):
+"""Test escalation path for SIGINT, SIGTERM, and SIGKILL when 
force=True."""
+# Mock the process wait method to return the exit code or raise an 
exception
+mock_process.wait.side_effect = wait_side_effect
+
+watched_subprocess.kill(signal_to_send=signal_to_send, 
escalation_delay=0.1, force=True)
+
+# Check that the correct signals were sent
+mock_process.send_signal.assert_has_calls([mocker.call(sig) for sig in 
expected_signals])
+
+# Check that the process was waited on for each signal
+mock_process.wait.assert_has_calls([mocker.call(timeout=0)] * 
len(expected_signals))
+
+## Validate log messages
+# If escalation occurred, we should see a warning log for each signal 
sent
+if len(expected_signals) > 1:
+assert {
+"event": "Process did not terminate in time; escalating",
+"level": "warning",

Review Comment:
   This could be changed to debug too!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1864004089


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)
+
+self._exit_code = self._process.wait(timeout=escalation_delay)

Review Comment:
   Updated in 
https://github.com/apache/airflow/pull/44465/commits/1e7dc464f5982e4b7b2b8c3a5f5585d6da53b83d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1864004174


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:

Review Comment:
   sure, changed the approach in 
https://github.com/apache/airflow/pull/44465/commits/1e7dc464f5982e4b7b2b8c3a5f5585d6da53b83d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863860334


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   interesting :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863580386


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   
https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1278-L1291
   
   `psutil` manages some of that for us -- i.e. we don't have to check it 
explicitly ourselves (re SIGTERM):
   
   ```py
   def send_signal(self, sig):
   """Send a signal *sig* to process pre-emptively checking
   whether PID has been reused (see signal module constants) .
   On Windows only SIGTERM is valid and is treated as an alias
   for kill().
   """
   if POSIX:
   self._send_signal(sig)
   else:  # pragma: no cover
   self._raise_if_pid_reused()
   if sig != signal.SIGTERM and not self.is_running():
   msg = "process no longer exists"
   raise NoSuchProcess(self.pid, self._name, msg=msg)
   self._proc.send_signal(sig)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863580386


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   
https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1278-L1291
   
   psutil manages some of that for us:
   
   ```py
   def send_signal(self, sig):
   """Send a signal *sig* to process pre-emptively checking
   whether PID has been reused (see signal module constants) .
   On Windows only SIGTERM is valid and is treated as an alias
   for kill().
   """
   if POSIX:
   self._send_signal(sig)
   else:  # pragma: no cover
   self._raise_if_pid_reused()
   if sig != signal.SIGTERM and not self.is_running():
   msg = "process no longer exists"
   raise NoSuchProcess(self.pid, self._name, msg=msg)
   self._proc.send_signal(sig)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Not mentioning fork that also does not **really** work well on MacOS 
(because on MacOS system libraries might start threads which does not play well 
with threads https://github.com/python/cpython/issues/77906), so if we would 
like to have explicit support for "production" MacOS (and later Windows) for 
"workers" -  we will have to make some serious `ifs` anyway.
   
   BTW. In Python 3.14 there are some changes coming that **might** finally 
result in a possibility of using fork() in production for MacOS 
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods
 - maybe we should already add "get_context()" and "set_start_method" to 
explicity set "fork" on systems that support it (minus MacOS :)  - as this will 
be a breaking change in Python 3.14
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Not mentioning that also `fork` does not **really** work well on MacOS 
(because on MacOS system libraries might start threads which does not play well 
with threads https://github.com/python/cpython/issues/77906), so if we would 
like to have explicit support for "production" MacOS (and later Windows) for 
"workers" -  we will have to make some serious `ifs` anyway.
   
   BTW. In Python 3.14 there are some changes coming that **might** finally 
result in a possibility of using fork() in production for MacOS 
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods
 - maybe we should already add "get_context()" and "set_start_method" to 
explicity set "fork" on systems that support it (minus MacOS :)  - as this will 
be a breaking change in Python 3.14
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Not mentioning fork that also does not **really** work well on MacOS 
(because on MacOS system libraries might start threads which does not play well 
with threads https://github.com/python/cpython/issues/77906), so if we would 
like to have explicit support for MacOS (and later Windows) for "workers" -  we 
will have to make some serious `ifs` anyway.
   
   BTW. In Python 3.14 there are some changes coming that **might** finally 
result in a possibility of using fork() in production for MacOS 
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods
 - maybe we should already add "get_context()" and "set_start_method" to 
explicity set "fork" on systems that support it (minus MacOS :)  - as this will 
be a breaking change in Python 3.14
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Not mentioning fork that also does not **really** work well on MacOS 
(because on MacOS system libraries might start threads which does not play well 
with threads https://github.com/python/cpython/issues/77906), so if we would 
like to have explicit support for MacOS (and later Windows) for "workers" -  we 
will have to make some serious ifs
   
   BTW. In Python 3.14 there are some changes coming that **might** finally 
result in a possibility of using fork() in production for MacOS 
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods
 - maybe we should already add "get_context()" and "set_start_method" to 
explicity set "fork" on systems that support it (minus MacOS :)  - as this will 
be a breaking change in Python 3.14
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863576653


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Not mentioning fork that also deoes not **really** work well on MacOS 
(because on MacOS system libraries might start threads which does not play well 
with threads https://github.com/python/cpython/issues/77906), so if we would 
like to have explicit support for MacOS (and later Windows) for "workers" -  we 
will have to make some serious ifs
   
   BTW. In Python 3.14 there are some changes coming that **might** finally 
result in a possibility of using fork() in production for MacOS 
https://docs.python.org/3/library/multiprocessing.html#multiprocessing-start-methods
 - maybe we should already add "get_context()" and "set_start_method" to 
explicity set "fork" on systems that support it (minus MacOS :)  - as this will 
be a breaking change in Python 3.14
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863564022


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   > Though we should try and support non-UNIX (i.e. Windows) which doesn't 
have process groups.
   
   SIGTERM does not work on Windows either, so we will have to do it completely 
differently if we want to support Windows. 
   
   See for example here: 
https://stackoverflow.com/questions/47306805/signal-sigterm-not-received-by-subprocess-on-windows



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863177225


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:

Review Comment:
   That's fine I think, as SIGKILL is uncatchable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863171511


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Though we should try and support non-UNIX (i.e. Windows) which doesn't have 
process groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863168995


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Yes we should create a new process group, there is already a todo for that, 
so that will be in a different PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863167664


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)
+
+self._exit_code = self._process.wait(timeout=escalation_delay)

Review Comment:
   No not really. The flow could be something like this:
   
   1. We send SIGINT to the process.
   2. It catches it, and send a log message
   3. The first call to `self.selector.select` would catch that, then enter wait
   4. The subprocess now sends a request to, for example, update an XCom value, 
and blocks waiting to read the response
   5. The supervisor is blocking in wait
   6. Supervisor wait times out, we then send sigkill.
   
   Remember, the socket being closed (which also happens when the process is 
hard killed with kill -9) counts as an event in the selector.
   
   We want to call this again.
   
   ```python
   events = self.selector.select(timeout=escalation_delay)
   self._process_file_object_events(events)
   
   self._check_subprocess_exit()
   ```
   
   and I suspect all three of those calls should be moved into 
`_process_file_object_events` and it renamed, so that the call is something 
like:
   
   ```python
   def _service_subprocess(max_time: float):
   events = self.selector.select(timeout=max_time)
   for key, _ in events:
   socket_handler = key.data
   
   
   self._check_subprocess_exit()
   ```
   
   That way `_monitor_subprocess` and `kill` can both simply call 
`_service_subprocess`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863114700


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Following discussion we had in 
https://github.com/apache/airflow/pull/41329#issuecomment-2506804789. ->  
should we create a new process group when we start task via supervisor and send 
the signal to process group ? 
   
   This handles way better situation where one of the child processes does not 
propagate the signals (which happens for example by default in `bash`).
   
   Typical approach here is to start a new process group with same id as the 
process it starts (setpgid(0,0)) right after we fork from supervisor:
   
   ```python
   # Check if we are not a group leader already (We should not be)
   if os.getpid() != os.getsid(0):
   # and create a new process group where we are the leader
   os.setpgid(0, 0)
   ```
   
   And then instead of sending signal to process you send it to process group
   
   ```python
os.killpg(gid, signal.SIGTERM)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863120669


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   This is for example what Ctrl-C does by default when you have interactive 
process - the TERM signal is sent to the foreground process group that shell 
sets for the foreground process - not **just** to the process. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-29 Thread via GitHub


potiuk commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1863114700


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -403,12 +403,57 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+force: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param force: If True, send the signal immediately without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if not force:
+with suppress(ProcessLookupError):
+self._process.send_signal(signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+self._process.send_signal(sig)

Review Comment:
   Following discussion we had in 
https://github.com/apache/airflow/pull/41329#issuecomment-2506804789. ->  
should we create a new process group when we start task via supervisor and send 
the signal to process group ? 
   
   This handles way better situation where one of the child processes does not 
propagate the signals (which happens for example by default in bash).
   
   Typical approach here is to start a new process group with same id as the 
process it starts (setpgid(0,0)) right after we fork from supervisor:
   
   ```python
   # Check if we are not a group leader already (We should not be)
   if os.getpid() != os.getsid(0):
   # and create a new process group where we are the leader
   os.setpgid(0, 0)
   ```
   
   And then instead of sending signal to process you send it to process group
   
   ```python
os.killpg(gid, signal.SIGTERM)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862814924


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)
+
+self._exit_code = self._process.wait(timeout=escalation_delay)

Review Comment:
   Does 
https://github.com/apache/airflow/pull/44465/commits/21fa9cbe3313fbca77964d1f83853c8f823b246d
 work? or do you think we should move it after `wait`? 
   
   I did it before assuming most process would end already when 
`self._process.wait` is returned/raise exception



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862814924


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)
+
+self._exit_code = self._process.wait(timeout=escalation_delay)

Review Comment:
   does 
https://github.com/apache/airflow/pull/44465/commits/21fa9cbe3313fbca77964d1f83853c8f823b246d
 work?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862802176


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:

Review Comment:
   it is also called here though:
   
   
https://github.com/apache/airflow/blob/840018a4264a841acbd41ca72f07938331abd3b8/task_sdk/src/airflow/sdk/execution_time/supervisor.py#L330-L339



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862773600


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)

Review Comment:
   oh nice, done in 
https://github.com/apache/airflow/pull/44465/commits/e6105241a0d56d4fac90cdfe96e75e379551ebd1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862732883


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:

Review Comment:
   This "should" be impossible to hit, assuming it's from the wait, not the 
os.kill call, as even if the process has exited, the process hangs around as a 
Zombie (on linux this shows up as `` in `ps aux`) until something 
`wait`s on it. And as long as our process is alive, nothing else should wait on 
it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862731293


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)

Review Comment:
   In which case, use `self._process.send_singal(sig)` -- that does the same 
https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1278-L1291



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862730091


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)
+
+self._exit_code = self._process.wait(timeout=escalation_delay)
+log.debug("Process exited", pid=self.pid, 
exit_code=self._exit_code, signal=sig.name)
+return
+except psutil.TimeoutExpired:
+log.warning("Process did not terminate in time; escalating", 
pid=self.pid, signal=sig.name)
+except psutil.NoSuchProcess:
+log.debug("Process already terminated", pid=self.pid)
+self._exit_code = 0

Review Comment:
   
https://github.com/apache/airflow/pull/44465/commits/63ac85ef632c26a3a5b2f989a2c5cb804cebd3b2



##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:

Review Comment:
   
https://github.com/apache/airflow/pull/44465/commits/63ac85ef632c26a3a5b2f989a2c5cb804cebd3b2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862728650


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:

Review Comment:
   I went back and forth with it -- as i was playing around with the default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862727780


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)
+
+self._exit_code = self._process.wait(timeout=escalation_delay)
+log.debug("Process exited", pid=self.pid, 
exit_code=self._exit_code, signal=sig.name)
+return
+except psutil.TimeoutExpired:
+log.warning("Process did not terminate in time; escalating", 
pid=self.pid, signal=sig.name)
+except psutil.NoSuchProcess:
+log.debug("Process already terminated", pid=self.pid)
+self._exit_code = 0

Review Comment:
   hmm, my initial thought was that process could have terminated, but yeah 
that's an assumption -- will change it to -1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862726361


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)

Review Comment:
   We could get some free niceties by directly calling 
`process.{kill,terminate}` -- some additional checks and cross-platform :
   
   
https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1326-L1334
   
   ```py
   def kill(self):
   """Kill the current process with SIGKILL pre-emptively checking
   whether PID has been reused.
   """
   if POSIX:
   self._send_signal(signal.SIGKILL)
   else:  # pragma: no cover
   self._raise_if_pid_reused()
   self._proc.kill()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


kaxil commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862726361


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)

Review Comment:
   We could get some free goodies by directly calling 
`process.{kill,terminate}` -- some additional checks and cross-platform :
   
   
https://github.com/giampaolo/psutil/blob/master/psutil/__init__.py#L1326-L1334
   
   ```py
   def kill(self):
   """Kill the current process with SIGKILL pre-emptively checking
   whether PID has been reused.
   """
   if POSIX:
   self._send_signal(signal.SIGKILL)
   else:  # pragma: no cover
   self._raise_if_pid_reused()
   self._proc.kill()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Add escalation path for killing process in Supervisor [airflow]

2024-11-28 Thread via GitHub


ashb commented on code in PR #44465:
URL: https://github.com/apache/airflow/pull/44465#discussion_r1862719032


##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:

Review Comment:
   Lets call this `force`



##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+escalation_path = [signal.SIGINT, signal.SIGTERM, signal.SIGKILL]
+if signal_to_send in escalation_path:
+# Start from `initial_signal`
+escalation_path = 
escalation_path[escalation_path.index(signal_to_send) :]
+
+for sig in escalation_path:
+try:
+if sig == signal.SIGKILL:
+self._process.kill()
+elif sig == signal.SIGTERM:
+self._process.terminate()
+else:
+os.kill(self.pid, sig)
+
+self._exit_code = self._process.wait(timeout=escalation_delay)

Review Comment:
   We probably want to service the sockets while we are waiting to kill it 
(collect any logs, and it may well be try to report the final state of the task 
before it exits which we will need to handle!)



##
task_sdk/src/airflow/sdk/execution_time/supervisor.py:
##
@@ -391,12 +391,55 @@ def _send_startup_message(self, ti: TaskInstance, path: 
str | os.PathLike[str],
 self.stdin.write(msg.model_dump_json().encode())
 self.stdin.write(b"\n")
 
-def kill(self, signal: signal.Signals = signal.SIGINT):
+def kill(
+self,
+signal_to_send: signal.Signals = signal.SIGINT,
+escalation_delay: float = 5.0,
+bypass_escalation: bool = False,
+):
+"""
+Attempt to terminate the subprocess with a given signal.
+
+If the process does not exit within `escalation_delay` seconds, 
escalate to SIGTERM and eventually SIGKILL if necessary.
+
+:param signal_to_send: The signal to send initially (default is 
SIGINT).
+:param escalation_delay: Time in seconds to wait before escalating to 
a stronger signal.
+:param bypass_escalation: If True, send the signal directly to the 
process without escalation.
+"""
 if self._exit_code is not None:
 return
 
-with suppress(ProcessLookupError):
-os.kill(self.pid, signal)
+if bypass_escalation:
+with suppress(ProcessLookupError):
+os.kill(self.pid, signal_to_send)
+return
+
+# Escalation sequence: SIGINT -> SIGTERM -> SIGKILL
+