This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c6ba13adf2 Skip pod cleanup in case of pod creation failed (#37671)
c6ba13adf2 is described below

commit c6ba13adf278125177f561a23c601358294fa766
Author: Gopal Dirisala <39794726+dir...@users.noreply.github.com>
AuthorDate: Mon Feb 26 12:02:12 2024 +0530

    Skip pod cleanup in case of pod creation failed (#37671)
---
 airflow/providers/cncf/kubernetes/operators/pod.py    |  6 ++++--
 kubernetes_tests/test_kubernetes_pod_operator.py      | 17 ++++++++++++++++-
 tests/providers/cncf/kubernetes/operators/test_pod.py |  9 +++++++--
 3 files changed, 27 insertions(+), 5 deletions(-)

diff --git a/airflow/providers/cncf/kubernetes/operators/pod.py 
b/airflow/providers/cncf/kubernetes/operators/pod.py
index 93fc2b59e7..4955db7633 100644
--- a/airflow/providers/cncf/kubernetes/operators/pod.py
+++ b/airflow/providers/cncf/kubernetes/operators/pod.py
@@ -793,9 +793,11 @@ class KubernetesPodOperator(BaseOperator):
             self.callbacks.on_pod_cleanup(pod=pod, client=self.client, 
mode=ExecutionMode.SYNC)
 
     def cleanup(self, pod: k8s.V1Pod, remote_pod: k8s.V1Pod):
-        # If a task got marked as failed, "on_kill" method would be called and 
the pod will be cleaned up
+        # Skip cleaning the pod in the following scenarios.
+        # 1. If a task got marked as failed, "on_kill" method would be called 
and the pod will be cleaned up
         # there. Cleaning it up again will raise an exception (which might 
cause retry).
-        if self._killed:
+        # 2. remote pod is null (ex: pod creation failed)
+        if self._killed or not remote_pod:
             return
 
         istio_enabled = self.is_istio_enabled(remote_pod)
diff --git a/kubernetes_tests/test_kubernetes_pod_operator.py 
b/kubernetes_tests/test_kubernetes_pod_operator.py
index 147c354dfc..21eba43663 100644
--- a/kubernetes_tests/test_kubernetes_pod_operator.py
+++ b/kubernetes_tests/test_kubernetes_pod_operator.py
@@ -206,6 +206,21 @@ class TestKubernetesPodOperatorSystem:
         assert self.expected_pod["spec"] == actual_pod["spec"]
         assert self.expected_pod["metadata"]["labels"] == 
actual_pod["metadata"]["labels"]
 
+    def test_skip_cleanup(self, mock_get_connection):
+        k = KubernetesPodOperator(
+            namespace="unknown",
+            image="ubuntu:16.04",
+            cmds=["bash", "-cx"],
+            arguments=["echo 10"],
+            labels=self.labels,
+            task_id=str(uuid4()),
+            in_cluster=False,
+            do_xcom_push=False,
+        )
+        context = create_context(k)
+        with pytest.raises(ApiException):
+            k.execute(context)
+
     def test_delete_operator_pod(self, mock_get_connection):
         k = KubernetesPodOperator(
             namespace="default",
@@ -1158,7 +1173,7 @@ class TestKubernetesPodOperatorSystem:
         # `create_pod` should be called because though there's still a pod to 
be found,
         # it will be `already_checked`
         with mock.patch(f"{POD_MANAGER_CLASS}.create_pod") as create_mock:
-            with pytest.raises(AirflowException):
+            with pytest.raises(Exception):
                 k.execute(context)
             create_mock.assert_called_once()
 
diff --git a/tests/providers/cncf/kubernetes/operators/test_pod.py 
b/tests/providers/cncf/kubernetes/operators/test_pod.py
index faa21eb7d7..a383895b2a 100644
--- a/tests/providers/cncf/kubernetes/operators/test_pod.py
+++ b/tests/providers/cncf/kubernetes/operators/test_pod.py
@@ -1229,16 +1229,21 @@ class TestKubernetesPodOperator:
         _, kwargs = k.client.list_namespaced_pod.call_args
         assert "already_checked!=True" in kwargs["label_selector"]
 
+    @patch(KUB_OP_PATH.format("find_pod"))
     @patch(f"{POD_MANAGER_CLASS}.delete_pod")
     @patch(f"{KPO_MODULE}.KubernetesPodOperator.patch_already_checked")
-    def test_mark_checked_unexpected_exception(self, 
mock_patch_already_checked, mock_delete_pod):
+    def test_mark_checked_unexpected_exception(
+        self, mock_patch_already_checked, mock_delete_pod, find_pod_mock
+    ):
         """If we aren't deleting pods and have an exception, mark it so we 
don't reattach to it"""
         k = KubernetesPodOperator(
             task_id="task",
             on_finish_action="keep_pod",
         )
+        found_pods = [MagicMock(), MagicMock(), MagicMock()]
+        find_pod_mock.side_effect = [None] + found_pods
         self.await_pod_mock.side_effect = AirflowException("oops")
-        context = create_context(k)
+        context = create_context(k, persist_to_db=True)
         with pytest.raises(AirflowException):
             k.execute(context=context)
         mock_patch_already_checked.assert_called_once()

Reply via email to