This is an automated email from the ASF dual-hosted git repository. potiuk pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new c6ba13adf2 Skip pod cleanup in case of pod creation failed (#37671) c6ba13adf2 is described below commit c6ba13adf278125177f561a23c601358294fa766 Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com> AuthorDate: Mon Feb 26 12:02:12 2024 +0530 Skip pod cleanup in case of pod creation failed (#37671) --- airflow/providers/cncf/kubernetes/operators/pod.py | 6 ++++-- kubernetes_tests/test_kubernetes_pod_operator.py | 17 ++++++++++++++++- tests/providers/cncf/kubernetes/operators/test_pod.py | 9 +++++++-- 3 files changed, 27 insertions(+), 5 deletions(-) diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py b/airflow/providers/cncf/kubernetes/operators/pod.py index 93fc2b59e7..4955db7633 100644 --- a/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/airflow/providers/cncf/kubernetes/operators/pod.py @@ -793,9 +793,11 @@ class KubernetesPodOperator(BaseOperator): self.callbacks.on_pod_cleanup(pod=pod, client=self.client, mode=ExecutionMode.SYNC) def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod): - # If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up + # Skip cleaning the pod in the following scenarios. + # 1. If a task got marked as failed, "on_kill" method would be called and the pod will be cleaned up # there. Cleaning it up again will raise an exception (which might cause retry). - if self._killed: + # 2. remote pod is null (ex: pod creation failed) + if self._killed or not remote_pod: return istio_enabled = self.is_istio_enabled(remote_pod) diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py b/kubernetes_tests/test_kubernetes_pod_operator.py index 147c354dfc..21eba43663 100644 --- a/kubernetes_tests/test_kubernetes_pod_operator.py +++ b/kubernetes_tests/test_kubernetes_pod_operator.py @@ -206,6 +206,21 @@ class TestKubernetesPodOperatorSystem: assert self.expected_pod["spec"] == actual_pod["spec"] assert self.expected_pod["metadata"]["labels"] == actual_pod["metadata"]["labels"] + def test_skip_cleanup(self, mock_get_connection): + k = KubernetesPodOperator( + namespace="unknown", + image="ubuntu:16.04", + cmds=["bash", "-cx"], + arguments=["echo 10"], + labels=self.labels, + task_id=str(uuid4()), + in_cluster=False, + do_xcom_push=False, + ) + context = create_context(k) + with pytest.raises(ApiException): + k.execute(context) + def test_delete_operator_pod(self, mock_get_connection): k = KubernetesPodOperator( namespace="default", @@ -1158,7 +1173,7 @@ class TestKubernetesPodOperatorSystem: # `create_pod` should be called because though there's still a pod to be found, # it will be `already_checked` with mock.patch(f"{POD_MANAGER_CLASS}.create_pod") as create_mock: - with pytest.raises(AirflowException): + with pytest.raises(Exception): k.execute(context) create_mock.assert_called_once() diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py b/tests/providers/cncf/kubernetes/operators/test_pod.py index faa21eb7d7..a383895b2a 100644 --- a/tests/providers/cncf/kubernetes/operators/test_pod.py +++ b/tests/providers/cncf/kubernetes/operators/test_pod.py @@ -1229,16 +1229,21 @@ class TestKubernetesPodOperator: _, kwargs = k.client.list_namespaced_pod.call_args assert "already_checked!=True" in kwargs["label_selector"] + @patch(KUB_OP_PATH.format("find_pod")) @patch(f"{POD_MANAGER_CLASS}.delete_pod") @patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked") - def test_mark_checked_unexpected_exception(self, mock_patch_already_checked, mock_delete_pod): + def test_mark_checked_unexpected_exception( + self, mock_patch_already_checked, mock_delete_pod, find_pod_mock + ): """If we aren't deleting pods and have an exception, mark it so we don't reattach to it""" k = KubernetesPodOperator( task_id="task", on_finish_action="keep_pod", ) + found_pods = [MagicMock(), MagicMock(), MagicMock()] + find_pod_mock.side_effect = [None] + found_pods self.await_pod_mock.side_effect = AirflowException("oops") - context = create_context(k) + context = create_context(k, persist_to_db=True) with pytest.raises(AirflowException): k.execute(context=context) mock_patch_already_checked.assert_called_once()