[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b Author: Daniel Imberman AuthorDate: Thu Jul 30 11:40:23 2020 -0700 Fixes PodMutationHook for backwards compatibility (#9903) Co-authored-by: Daniel Imberman Co-authored-by: Kaxil Naik --- airflow/kubernetes/k8s_model.py | 16 +++ airflow/kubernetes/pod.py| 33 -- airflow/kubernetes/pod_launcher.py | 26 +++- airflow/kubernetes/pod_launcher_helper.py| 96 +++ airflow/kubernetes/volume_mount.py | 1 + airflow/kubernetes_deprecated/__init__.py| 16 +++ airflow/kubernetes_deprecated/pod.py | 171 +++ docs/conf.py | 1 + tests/kubernetes/models/test_pod.py | 81 + tests/kubernetes/test_pod_launcher_helper.py | 97 +++ tests/test_local_settings.py | 96 +++ 11 files changed, 619 insertions(+), 15 deletions(-) diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py index 3fd2f9e..e10a946 100644 --- a/airflow/kubernetes/k8s_model.py +++ b/airflow/kubernetes/k8s_model.py @@ -29,6 +29,7 @@ else: class K8SModel(ABC): + """ These Airflow Kubernetes models are here for backwards compatibility reasons only. Ideally clients should use the kubernetes api @@ -39,6 +40,7 @@ class K8SModel(ABC): can be avoided. All of these models implement the `attach_to_pod` method so that they integrate with the kubernetes client. """ + @abc.abstractmethod def attach_to_pod(self, pod): """ @@ -47,9 +49,23 @@ class K8SModel(ABC): :return: The pod with the object attached """ +def as_dict(self): +res = {} +if hasattr(self, "__slots__"): +for s in self.__slots__: +if hasattr(self, s): +res[s] = getattr(self, s) +if hasattr(self, "__dict__"): +res_dict = self.__dict__.copy() +res_dict.update(res) +return res_dict +return res + def append_to_pod(pod, k8s_objects): """ +Attach Kubernetes objects to the given POD + :param pod: A pod to attach a list of Kubernetes objects to :type pod: kubernetes.client.models.V1Pod :param k8s_objects: a potential None list of K8SModels diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 0b332c2..9e455af 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel class Resources(K8SModel): -__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu') +__slots__ = ('request_memory', + 'request_cpu', + 'limit_memory', + 'limit_cpu', + 'limit_gpu', + 'request_ephemeral_storage', + 'limit_ephemeral_storage') """ :param request_memory: requested memory @@ -44,15 +50,17 @@ class Resources(K8SModel): :param limit_ephemeral_storage: Limit for ephemeral storage :type limit_ephemeral_storage: float | str """ + def __init__( -self, -request_memory=None, -request_cpu=None, -request_ephemeral_storage=None, -limit_memory=None, -limit_cpu=None, -limit_gpu=None, -limit_ephemeral_storage=None): +self, +request_memory=None, +request_cpu=None, +request_ephemeral_storage=None, +limit_memory=None, +limit_cpu=None, +limit_gpu=None, +limit_ephemeral_storage=None +): self.request_memory = request_memory self.request_cpu = request_cpu self.request_ephemeral_storage = request_ephemeral_storage @@ -104,9 +112,10 @@ class Port(K8SModel): __slots__ = ('name', 'container_port') def __init__( -self, -name=None, -container_port=None): +self, +name=None, +container_port=None +): """Creates port""" self.name = name self.container_port = container_port diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index d27a647..05df204 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream from requests.exceptions import BaseHTTPError from airflow import AirflowException +from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod from airflow.kubernetes.pod_generator import PodDefaults -from airflow.settings import pod_mutation_hook +from airflow
[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b Author: Daniel Imberman AuthorDate: Thu Jul 30 11:40:23 2020 -0700 Fixes PodMutationHook for backwards compatibility (#9903) Co-authored-by: Daniel Imberman Co-authored-by: Kaxil Naik --- airflow/kubernetes/k8s_model.py | 16 +++ airflow/kubernetes/pod.py| 33 -- airflow/kubernetes/pod_launcher.py | 26 +++- airflow/kubernetes/pod_launcher_helper.py| 96 +++ airflow/kubernetes/volume_mount.py | 1 + airflow/kubernetes_deprecated/__init__.py| 16 +++ airflow/kubernetes_deprecated/pod.py | 171 +++ docs/conf.py | 1 + tests/kubernetes/models/test_pod.py | 81 + tests/kubernetes/test_pod_launcher_helper.py | 97 +++ tests/test_local_settings.py | 96 +++ 11 files changed, 619 insertions(+), 15 deletions(-) diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py index 3fd2f9e..e10a946 100644 --- a/airflow/kubernetes/k8s_model.py +++ b/airflow/kubernetes/k8s_model.py @@ -29,6 +29,7 @@ else: class K8SModel(ABC): + """ These Airflow Kubernetes models are here for backwards compatibility reasons only. Ideally clients should use the kubernetes api @@ -39,6 +40,7 @@ class K8SModel(ABC): can be avoided. All of these models implement the `attach_to_pod` method so that they integrate with the kubernetes client. """ + @abc.abstractmethod def attach_to_pod(self, pod): """ @@ -47,9 +49,23 @@ class K8SModel(ABC): :return: The pod with the object attached """ +def as_dict(self): +res = {} +if hasattr(self, "__slots__"): +for s in self.__slots__: +if hasattr(self, s): +res[s] = getattr(self, s) +if hasattr(self, "__dict__"): +res_dict = self.__dict__.copy() +res_dict.update(res) +return res_dict +return res + def append_to_pod(pod, k8s_objects): """ +Attach Kubernetes objects to the given POD + :param pod: A pod to attach a list of Kubernetes objects to :type pod: kubernetes.client.models.V1Pod :param k8s_objects: a potential None list of K8SModels diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 0b332c2..9e455af 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel class Resources(K8SModel): -__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu') +__slots__ = ('request_memory', + 'request_cpu', + 'limit_memory', + 'limit_cpu', + 'limit_gpu', + 'request_ephemeral_storage', + 'limit_ephemeral_storage') """ :param request_memory: requested memory @@ -44,15 +50,17 @@ class Resources(K8SModel): :param limit_ephemeral_storage: Limit for ephemeral storage :type limit_ephemeral_storage: float | str """ + def __init__( -self, -request_memory=None, -request_cpu=None, -request_ephemeral_storage=None, -limit_memory=None, -limit_cpu=None, -limit_gpu=None, -limit_ephemeral_storage=None): +self, +request_memory=None, +request_cpu=None, +request_ephemeral_storage=None, +limit_memory=None, +limit_cpu=None, +limit_gpu=None, +limit_ephemeral_storage=None +): self.request_memory = request_memory self.request_cpu = request_cpu self.request_ephemeral_storage = request_ephemeral_storage @@ -104,9 +112,10 @@ class Port(K8SModel): __slots__ = ('name', 'container_port') def __init__( -self, -name=None, -container_port=None): +self, +name=None, +container_port=None +): """Creates port""" self.name = name self.container_port = container_port diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index d27a647..05df204 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream from requests.exceptions import BaseHTTPError from airflow import AirflowException +from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod from airflow.kubernetes.pod_generator import PodDefaults -from airflow.settings import pod_mutation_hook +from airflow
[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b Author: Daniel Imberman AuthorDate: Thu Jul 30 11:40:23 2020 -0700 Fixes PodMutationHook for backwards compatibility (#9903) Co-authored-by: Daniel Imberman Co-authored-by: Kaxil Naik --- airflow/kubernetes/k8s_model.py | 16 +++ airflow/kubernetes/pod.py| 33 -- airflow/kubernetes/pod_launcher.py | 26 +++- airflow/kubernetes/pod_launcher_helper.py| 96 +++ airflow/kubernetes/volume_mount.py | 1 + airflow/kubernetes_deprecated/__init__.py| 16 +++ airflow/kubernetes_deprecated/pod.py | 171 +++ docs/conf.py | 1 + tests/kubernetes/models/test_pod.py | 81 + tests/kubernetes/test_pod_launcher_helper.py | 97 +++ tests/test_local_settings.py | 96 +++ 11 files changed, 619 insertions(+), 15 deletions(-) diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py index 3fd2f9e..e10a946 100644 --- a/airflow/kubernetes/k8s_model.py +++ b/airflow/kubernetes/k8s_model.py @@ -29,6 +29,7 @@ else: class K8SModel(ABC): + """ These Airflow Kubernetes models are here for backwards compatibility reasons only. Ideally clients should use the kubernetes api @@ -39,6 +40,7 @@ class K8SModel(ABC): can be avoided. All of these models implement the `attach_to_pod` method so that they integrate with the kubernetes client. """ + @abc.abstractmethod def attach_to_pod(self, pod): """ @@ -47,9 +49,23 @@ class K8SModel(ABC): :return: The pod with the object attached """ +def as_dict(self): +res = {} +if hasattr(self, "__slots__"): +for s in self.__slots__: +if hasattr(self, s): +res[s] = getattr(self, s) +if hasattr(self, "__dict__"): +res_dict = self.__dict__.copy() +res_dict.update(res) +return res_dict +return res + def append_to_pod(pod, k8s_objects): """ +Attach Kubernetes objects to the given POD + :param pod: A pod to attach a list of Kubernetes objects to :type pod: kubernetes.client.models.V1Pod :param k8s_objects: a potential None list of K8SModels diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 0b332c2..9e455af 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel class Resources(K8SModel): -__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu') +__slots__ = ('request_memory', + 'request_cpu', + 'limit_memory', + 'limit_cpu', + 'limit_gpu', + 'request_ephemeral_storage', + 'limit_ephemeral_storage') """ :param request_memory: requested memory @@ -44,15 +50,17 @@ class Resources(K8SModel): :param limit_ephemeral_storage: Limit for ephemeral storage :type limit_ephemeral_storage: float | str """ + def __init__( -self, -request_memory=None, -request_cpu=None, -request_ephemeral_storage=None, -limit_memory=None, -limit_cpu=None, -limit_gpu=None, -limit_ephemeral_storage=None): +self, +request_memory=None, +request_cpu=None, +request_ephemeral_storage=None, +limit_memory=None, +limit_cpu=None, +limit_gpu=None, +limit_ephemeral_storage=None +): self.request_memory = request_memory self.request_cpu = request_cpu self.request_ephemeral_storage = request_ephemeral_storage @@ -104,9 +112,10 @@ class Port(K8SModel): __slots__ = ('name', 'container_port') def __init__( -self, -name=None, -container_port=None): +self, +name=None, +container_port=None +): """Creates port""" self.name = name self.container_port = container_port diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index d27a647..05df204 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream from requests.exceptions import BaseHTTPError from airflow import AirflowException +from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod from airflow.kubernetes.pod_generator import PodDefaults -from airflow.settings import pod_mutation_hook +from airflow
[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b Author: Daniel Imberman AuthorDate: Thu Jul 30 11:40:23 2020 -0700 Fixes PodMutationHook for backwards compatibility (#9903) Co-authored-by: Daniel Imberman Co-authored-by: Kaxil Naik --- airflow/kubernetes/k8s_model.py | 16 +++ airflow/kubernetes/pod.py| 33 -- airflow/kubernetes/pod_launcher.py | 26 +++- airflow/kubernetes/pod_launcher_helper.py| 96 +++ airflow/kubernetes/volume_mount.py | 1 + airflow/kubernetes_deprecated/__init__.py| 16 +++ airflow/kubernetes_deprecated/pod.py | 171 +++ docs/conf.py | 1 + tests/kubernetes/models/test_pod.py | 81 + tests/kubernetes/test_pod_launcher_helper.py | 97 +++ tests/test_local_settings.py | 96 +++ 11 files changed, 619 insertions(+), 15 deletions(-) diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py index 3fd2f9e..e10a946 100644 --- a/airflow/kubernetes/k8s_model.py +++ b/airflow/kubernetes/k8s_model.py @@ -29,6 +29,7 @@ else: class K8SModel(ABC): + """ These Airflow Kubernetes models are here for backwards compatibility reasons only. Ideally clients should use the kubernetes api @@ -39,6 +40,7 @@ class K8SModel(ABC): can be avoided. All of these models implement the `attach_to_pod` method so that they integrate with the kubernetes client. """ + @abc.abstractmethod def attach_to_pod(self, pod): """ @@ -47,9 +49,23 @@ class K8SModel(ABC): :return: The pod with the object attached """ +def as_dict(self): +res = {} +if hasattr(self, "__slots__"): +for s in self.__slots__: +if hasattr(self, s): +res[s] = getattr(self, s) +if hasattr(self, "__dict__"): +res_dict = self.__dict__.copy() +res_dict.update(res) +return res_dict +return res + def append_to_pod(pod, k8s_objects): """ +Attach Kubernetes objects to the given POD + :param pod: A pod to attach a list of Kubernetes objects to :type pod: kubernetes.client.models.V1Pod :param k8s_objects: a potential None list of K8SModels diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 0b332c2..9e455af 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel class Resources(K8SModel): -__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu') +__slots__ = ('request_memory', + 'request_cpu', + 'limit_memory', + 'limit_cpu', + 'limit_gpu', + 'request_ephemeral_storage', + 'limit_ephemeral_storage') """ :param request_memory: requested memory @@ -44,15 +50,17 @@ class Resources(K8SModel): :param limit_ephemeral_storage: Limit for ephemeral storage :type limit_ephemeral_storage: float | str """ + def __init__( -self, -request_memory=None, -request_cpu=None, -request_ephemeral_storage=None, -limit_memory=None, -limit_cpu=None, -limit_gpu=None, -limit_ephemeral_storage=None): +self, +request_memory=None, +request_cpu=None, +request_ephemeral_storage=None, +limit_memory=None, +limit_cpu=None, +limit_gpu=None, +limit_ephemeral_storage=None +): self.request_memory = request_memory self.request_cpu = request_cpu self.request_ephemeral_storage = request_ephemeral_storage @@ -104,9 +112,10 @@ class Port(K8SModel): __slots__ = ('name', 'container_port') def __init__( -self, -name=None, -container_port=None): +self, +name=None, +container_port=None +): """Creates port""" self.name = name self.container_port = container_port diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index d27a647..05df204 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream from requests.exceptions import BaseHTTPError from airflow import AirflowException +from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod from airflow.kubernetes.pod_generator import PodDefaults -from airflow.settings import pod_mutation_hook +from airflow
[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b Author: Daniel Imberman AuthorDate: Thu Jul 30 11:40:23 2020 -0700 Fixes PodMutationHook for backwards compatibility (#9903) Co-authored-by: Daniel Imberman Co-authored-by: Kaxil Naik --- airflow/kubernetes/k8s_model.py | 16 +++ airflow/kubernetes/pod.py| 33 -- airflow/kubernetes/pod_launcher.py | 26 +++- airflow/kubernetes/pod_launcher_helper.py| 96 +++ airflow/kubernetes/volume_mount.py | 1 + airflow/kubernetes_deprecated/__init__.py| 16 +++ airflow/kubernetes_deprecated/pod.py | 171 +++ docs/conf.py | 1 + tests/kubernetes/models/test_pod.py | 81 + tests/kubernetes/test_pod_launcher_helper.py | 97 +++ tests/test_local_settings.py | 96 +++ 11 files changed, 619 insertions(+), 15 deletions(-) diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py index 3fd2f9e..e10a946 100644 --- a/airflow/kubernetes/k8s_model.py +++ b/airflow/kubernetes/k8s_model.py @@ -29,6 +29,7 @@ else: class K8SModel(ABC): + """ These Airflow Kubernetes models are here for backwards compatibility reasons only. Ideally clients should use the kubernetes api @@ -39,6 +40,7 @@ class K8SModel(ABC): can be avoided. All of these models implement the `attach_to_pod` method so that they integrate with the kubernetes client. """ + @abc.abstractmethod def attach_to_pod(self, pod): """ @@ -47,9 +49,23 @@ class K8SModel(ABC): :return: The pod with the object attached """ +def as_dict(self): +res = {} +if hasattr(self, "__slots__"): +for s in self.__slots__: +if hasattr(self, s): +res[s] = getattr(self, s) +if hasattr(self, "__dict__"): +res_dict = self.__dict__.copy() +res_dict.update(res) +return res_dict +return res + def append_to_pod(pod, k8s_objects): """ +Attach Kubernetes objects to the given POD + :param pod: A pod to attach a list of Kubernetes objects to :type pod: kubernetes.client.models.V1Pod :param k8s_objects: a potential None list of K8SModels diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py index 0b332c2..9e455af 100644 --- a/airflow/kubernetes/pod.py +++ b/airflow/kubernetes/pod.py @@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel class Resources(K8SModel): -__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 'limit_gpu') +__slots__ = ('request_memory', + 'request_cpu', + 'limit_memory', + 'limit_cpu', + 'limit_gpu', + 'request_ephemeral_storage', + 'limit_ephemeral_storage') """ :param request_memory: requested memory @@ -44,15 +50,17 @@ class Resources(K8SModel): :param limit_ephemeral_storage: Limit for ephemeral storage :type limit_ephemeral_storage: float | str """ + def __init__( -self, -request_memory=None, -request_cpu=None, -request_ephemeral_storage=None, -limit_memory=None, -limit_cpu=None, -limit_gpu=None, -limit_ephemeral_storage=None): +self, +request_memory=None, +request_cpu=None, +request_ephemeral_storage=None, +limit_memory=None, +limit_cpu=None, +limit_gpu=None, +limit_ephemeral_storage=None +): self.request_memory = request_memory self.request_cpu = request_cpu self.request_ephemeral_storage = request_ephemeral_storage @@ -104,9 +112,10 @@ class Port(K8SModel): __slots__ = ('name', 'container_port') def __init__( -self, -name=None, -container_port=None): +self, +name=None, +container_port=None +): """Creates port""" self.name = name self.container_port = container_port diff --git a/airflow/kubernetes/pod_launcher.py b/airflow/kubernetes/pod_launcher.py index d27a647..05df204 100644 --- a/airflow/kubernetes/pod_launcher.py +++ b/airflow/kubernetes/pod_launcher.py @@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream from requests.exceptions import BaseHTTPError from airflow import AirflowException +from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod from airflow.kubernetes.pod_generator import PodDefaults -from airflow.settings import pod_mutation_hook +from airflow