[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b
Author: Daniel Imberman 
AuthorDate: Thu Jul 30 11:40:23 2020 -0700

Fixes PodMutationHook for backwards compatibility (#9903)

Co-authored-by: Daniel Imberman 
Co-authored-by: Kaxil Naik 
---
 airflow/kubernetes/k8s_model.py  |  16 +++
 airflow/kubernetes/pod.py|  33 --
 airflow/kubernetes/pod_launcher.py   |  26 +++-
 airflow/kubernetes/pod_launcher_helper.py|  96 +++
 airflow/kubernetes/volume_mount.py   |   1 +
 airflow/kubernetes_deprecated/__init__.py|  16 +++
 airflow/kubernetes_deprecated/pod.py | 171 +++
 docs/conf.py |   1 +
 tests/kubernetes/models/test_pod.py  |  81 +
 tests/kubernetes/test_pod_launcher_helper.py |  97 +++
 tests/test_local_settings.py |  96 +++
 11 files changed, 619 insertions(+), 15 deletions(-)

diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
index 3fd2f9e..e10a946 100644
--- a/airflow/kubernetes/k8s_model.py
+++ b/airflow/kubernetes/k8s_model.py
@@ -29,6 +29,7 @@ else:
 
 
 class K8SModel(ABC):
+
 """
 These Airflow Kubernetes models are here for backwards compatibility
 reasons only. Ideally clients should use the kubernetes api
@@ -39,6 +40,7 @@ class K8SModel(ABC):
 can be avoided. All of these models implement the
 `attach_to_pod` method so that they integrate with the kubernetes client.
 """
+
 @abc.abstractmethod
 def attach_to_pod(self, pod):
 """
@@ -47,9 +49,23 @@ class K8SModel(ABC):
 :return: The pod with the object attached
 """
 
+def as_dict(self):
+res = {}
+if hasattr(self, "__slots__"):
+for s in self.__slots__:
+if hasattr(self, s):
+res[s] = getattr(self, s)
+if hasattr(self, "__dict__"):
+res_dict = self.__dict__.copy()
+res_dict.update(res)
+return res_dict
+return res
+
 
 def append_to_pod(pod, k8s_objects):
 """
+Attach Kubernetes objects to the given POD
+
 :param pod: A pod to attach a list of Kubernetes objects to
 :type pod: kubernetes.client.models.V1Pod
 :param k8s_objects: a potential None list of K8SModels
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 0b332c2..9e455af 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel
 
 
 class Resources(K8SModel):
-__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 
'limit_gpu')
+__slots__ = ('request_memory',
+ 'request_cpu',
+ 'limit_memory',
+ 'limit_cpu',
+ 'limit_gpu',
+ 'request_ephemeral_storage',
+ 'limit_ephemeral_storage')
 
 """
 :param request_memory: requested memory
@@ -44,15 +50,17 @@ class Resources(K8SModel):
 :param limit_ephemeral_storage: Limit for ephemeral storage
 :type limit_ephemeral_storage: float | str
 """
+
 def __init__(
-self,
-request_memory=None,
-request_cpu=None,
-request_ephemeral_storage=None,
-limit_memory=None,
-limit_cpu=None,
-limit_gpu=None,
-limit_ephemeral_storage=None):
+self,
+request_memory=None,
+request_cpu=None,
+request_ephemeral_storage=None,
+limit_memory=None,
+limit_cpu=None,
+limit_gpu=None,
+limit_ephemeral_storage=None
+):
 self.request_memory = request_memory
 self.request_cpu = request_cpu
 self.request_ephemeral_storage = request_ephemeral_storage
@@ -104,9 +112,10 @@ class Port(K8SModel):
 __slots__ = ('name', 'container_port')
 
 def __init__(
-self,
-name=None,
-container_port=None):
+self,
+name=None,
+container_port=None
+):
 """Creates port"""
 self.name = name
 self.container_port = container_port
diff --git a/airflow/kubernetes/pod_launcher.py 
b/airflow/kubernetes/pod_launcher.py
index d27a647..05df204 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream
 from requests.exceptions import BaseHTTPError
 
 from airflow import AirflowException
+from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
 from airflow.kubernetes.pod_generator import PodDefaults
-from airflow.settings import pod_mutation_hook
+from airflow 

[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b
Author: Daniel Imberman 
AuthorDate: Thu Jul 30 11:40:23 2020 -0700

Fixes PodMutationHook for backwards compatibility (#9903)

Co-authored-by: Daniel Imberman 
Co-authored-by: Kaxil Naik 
---
 airflow/kubernetes/k8s_model.py  |  16 +++
 airflow/kubernetes/pod.py|  33 --
 airflow/kubernetes/pod_launcher.py   |  26 +++-
 airflow/kubernetes/pod_launcher_helper.py|  96 +++
 airflow/kubernetes/volume_mount.py   |   1 +
 airflow/kubernetes_deprecated/__init__.py|  16 +++
 airflow/kubernetes_deprecated/pod.py | 171 +++
 docs/conf.py |   1 +
 tests/kubernetes/models/test_pod.py  |  81 +
 tests/kubernetes/test_pod_launcher_helper.py |  97 +++
 tests/test_local_settings.py |  96 +++
 11 files changed, 619 insertions(+), 15 deletions(-)

diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
index 3fd2f9e..e10a946 100644
--- a/airflow/kubernetes/k8s_model.py
+++ b/airflow/kubernetes/k8s_model.py
@@ -29,6 +29,7 @@ else:
 
 
 class K8SModel(ABC):
+
 """
 These Airflow Kubernetes models are here for backwards compatibility
 reasons only. Ideally clients should use the kubernetes api
@@ -39,6 +40,7 @@ class K8SModel(ABC):
 can be avoided. All of these models implement the
 `attach_to_pod` method so that they integrate with the kubernetes client.
 """
+
 @abc.abstractmethod
 def attach_to_pod(self, pod):
 """
@@ -47,9 +49,23 @@ class K8SModel(ABC):
 :return: The pod with the object attached
 """
 
+def as_dict(self):
+res = {}
+if hasattr(self, "__slots__"):
+for s in self.__slots__:
+if hasattr(self, s):
+res[s] = getattr(self, s)
+if hasattr(self, "__dict__"):
+res_dict = self.__dict__.copy()
+res_dict.update(res)
+return res_dict
+return res
+
 
 def append_to_pod(pod, k8s_objects):
 """
+Attach Kubernetes objects to the given POD
+
 :param pod: A pod to attach a list of Kubernetes objects to
 :type pod: kubernetes.client.models.V1Pod
 :param k8s_objects: a potential None list of K8SModels
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 0b332c2..9e455af 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel
 
 
 class Resources(K8SModel):
-__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 
'limit_gpu')
+__slots__ = ('request_memory',
+ 'request_cpu',
+ 'limit_memory',
+ 'limit_cpu',
+ 'limit_gpu',
+ 'request_ephemeral_storage',
+ 'limit_ephemeral_storage')
 
 """
 :param request_memory: requested memory
@@ -44,15 +50,17 @@ class Resources(K8SModel):
 :param limit_ephemeral_storage: Limit for ephemeral storage
 :type limit_ephemeral_storage: float | str
 """
+
 def __init__(
-self,
-request_memory=None,
-request_cpu=None,
-request_ephemeral_storage=None,
-limit_memory=None,
-limit_cpu=None,
-limit_gpu=None,
-limit_ephemeral_storage=None):
+self,
+request_memory=None,
+request_cpu=None,
+request_ephemeral_storage=None,
+limit_memory=None,
+limit_cpu=None,
+limit_gpu=None,
+limit_ephemeral_storage=None
+):
 self.request_memory = request_memory
 self.request_cpu = request_cpu
 self.request_ephemeral_storage = request_ephemeral_storage
@@ -104,9 +112,10 @@ class Port(K8SModel):
 __slots__ = ('name', 'container_port')
 
 def __init__(
-self,
-name=None,
-container_port=None):
+self,
+name=None,
+container_port=None
+):
 """Creates port"""
 self.name = name
 self.container_port = container_port
diff --git a/airflow/kubernetes/pod_launcher.py 
b/airflow/kubernetes/pod_launcher.py
index d27a647..05df204 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream
 from requests.exceptions import BaseHTTPError
 
 from airflow import AirflowException
+from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
 from airflow.kubernetes.pod_generator import PodDefaults
-from airflow.settings import pod_mutation_hook
+from airflow 

[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b
Author: Daniel Imberman 
AuthorDate: Thu Jul 30 11:40:23 2020 -0700

Fixes PodMutationHook for backwards compatibility (#9903)

Co-authored-by: Daniel Imberman 
Co-authored-by: Kaxil Naik 
---
 airflow/kubernetes/k8s_model.py  |  16 +++
 airflow/kubernetes/pod.py|  33 --
 airflow/kubernetes/pod_launcher.py   |  26 +++-
 airflow/kubernetes/pod_launcher_helper.py|  96 +++
 airflow/kubernetes/volume_mount.py   |   1 +
 airflow/kubernetes_deprecated/__init__.py|  16 +++
 airflow/kubernetes_deprecated/pod.py | 171 +++
 docs/conf.py |   1 +
 tests/kubernetes/models/test_pod.py  |  81 +
 tests/kubernetes/test_pod_launcher_helper.py |  97 +++
 tests/test_local_settings.py |  96 +++
 11 files changed, 619 insertions(+), 15 deletions(-)

diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
index 3fd2f9e..e10a946 100644
--- a/airflow/kubernetes/k8s_model.py
+++ b/airflow/kubernetes/k8s_model.py
@@ -29,6 +29,7 @@ else:
 
 
 class K8SModel(ABC):
+
 """
 These Airflow Kubernetes models are here for backwards compatibility
 reasons only. Ideally clients should use the kubernetes api
@@ -39,6 +40,7 @@ class K8SModel(ABC):
 can be avoided. All of these models implement the
 `attach_to_pod` method so that they integrate with the kubernetes client.
 """
+
 @abc.abstractmethod
 def attach_to_pod(self, pod):
 """
@@ -47,9 +49,23 @@ class K8SModel(ABC):
 :return: The pod with the object attached
 """
 
+def as_dict(self):
+res = {}
+if hasattr(self, "__slots__"):
+for s in self.__slots__:
+if hasattr(self, s):
+res[s] = getattr(self, s)
+if hasattr(self, "__dict__"):
+res_dict = self.__dict__.copy()
+res_dict.update(res)
+return res_dict
+return res
+
 
 def append_to_pod(pod, k8s_objects):
 """
+Attach Kubernetes objects to the given POD
+
 :param pod: A pod to attach a list of Kubernetes objects to
 :type pod: kubernetes.client.models.V1Pod
 :param k8s_objects: a potential None list of K8SModels
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 0b332c2..9e455af 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel
 
 
 class Resources(K8SModel):
-__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 
'limit_gpu')
+__slots__ = ('request_memory',
+ 'request_cpu',
+ 'limit_memory',
+ 'limit_cpu',
+ 'limit_gpu',
+ 'request_ephemeral_storage',
+ 'limit_ephemeral_storage')
 
 """
 :param request_memory: requested memory
@@ -44,15 +50,17 @@ class Resources(K8SModel):
 :param limit_ephemeral_storage: Limit for ephemeral storage
 :type limit_ephemeral_storage: float | str
 """
+
 def __init__(
-self,
-request_memory=None,
-request_cpu=None,
-request_ephemeral_storage=None,
-limit_memory=None,
-limit_cpu=None,
-limit_gpu=None,
-limit_ephemeral_storage=None):
+self,
+request_memory=None,
+request_cpu=None,
+request_ephemeral_storage=None,
+limit_memory=None,
+limit_cpu=None,
+limit_gpu=None,
+limit_ephemeral_storage=None
+):
 self.request_memory = request_memory
 self.request_cpu = request_cpu
 self.request_ephemeral_storage = request_ephemeral_storage
@@ -104,9 +112,10 @@ class Port(K8SModel):
 __slots__ = ('name', 'container_port')
 
 def __init__(
-self,
-name=None,
-container_port=None):
+self,
+name=None,
+container_port=None
+):
 """Creates port"""
 self.name = name
 self.container_port = container_port
diff --git a/airflow/kubernetes/pod_launcher.py 
b/airflow/kubernetes/pod_launcher.py
index d27a647..05df204 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream
 from requests.exceptions import BaseHTTPError
 
 from airflow import AirflowException
+from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
 from airflow.kubernetes.pod_generator import PodDefaults
-from airflow.settings import pod_mutation_hook
+from airflow 

[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b
Author: Daniel Imberman 
AuthorDate: Thu Jul 30 11:40:23 2020 -0700

Fixes PodMutationHook for backwards compatibility (#9903)

Co-authored-by: Daniel Imberman 
Co-authored-by: Kaxil Naik 
---
 airflow/kubernetes/k8s_model.py  |  16 +++
 airflow/kubernetes/pod.py|  33 --
 airflow/kubernetes/pod_launcher.py   |  26 +++-
 airflow/kubernetes/pod_launcher_helper.py|  96 +++
 airflow/kubernetes/volume_mount.py   |   1 +
 airflow/kubernetes_deprecated/__init__.py|  16 +++
 airflow/kubernetes_deprecated/pod.py | 171 +++
 docs/conf.py |   1 +
 tests/kubernetes/models/test_pod.py  |  81 +
 tests/kubernetes/test_pod_launcher_helper.py |  97 +++
 tests/test_local_settings.py |  96 +++
 11 files changed, 619 insertions(+), 15 deletions(-)

diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
index 3fd2f9e..e10a946 100644
--- a/airflow/kubernetes/k8s_model.py
+++ b/airflow/kubernetes/k8s_model.py
@@ -29,6 +29,7 @@ else:
 
 
 class K8SModel(ABC):
+
 """
 These Airflow Kubernetes models are here for backwards compatibility
 reasons only. Ideally clients should use the kubernetes api
@@ -39,6 +40,7 @@ class K8SModel(ABC):
 can be avoided. All of these models implement the
 `attach_to_pod` method so that they integrate with the kubernetes client.
 """
+
 @abc.abstractmethod
 def attach_to_pod(self, pod):
 """
@@ -47,9 +49,23 @@ class K8SModel(ABC):
 :return: The pod with the object attached
 """
 
+def as_dict(self):
+res = {}
+if hasattr(self, "__slots__"):
+for s in self.__slots__:
+if hasattr(self, s):
+res[s] = getattr(self, s)
+if hasattr(self, "__dict__"):
+res_dict = self.__dict__.copy()
+res_dict.update(res)
+return res_dict
+return res
+
 
 def append_to_pod(pod, k8s_objects):
 """
+Attach Kubernetes objects to the given POD
+
 :param pod: A pod to attach a list of Kubernetes objects to
 :type pod: kubernetes.client.models.V1Pod
 :param k8s_objects: a potential None list of K8SModels
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 0b332c2..9e455af 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel
 
 
 class Resources(K8SModel):
-__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 
'limit_gpu')
+__slots__ = ('request_memory',
+ 'request_cpu',
+ 'limit_memory',
+ 'limit_cpu',
+ 'limit_gpu',
+ 'request_ephemeral_storage',
+ 'limit_ephemeral_storage')
 
 """
 :param request_memory: requested memory
@@ -44,15 +50,17 @@ class Resources(K8SModel):
 :param limit_ephemeral_storage: Limit for ephemeral storage
 :type limit_ephemeral_storage: float | str
 """
+
 def __init__(
-self,
-request_memory=None,
-request_cpu=None,
-request_ephemeral_storage=None,
-limit_memory=None,
-limit_cpu=None,
-limit_gpu=None,
-limit_ephemeral_storage=None):
+self,
+request_memory=None,
+request_cpu=None,
+request_ephemeral_storage=None,
+limit_memory=None,
+limit_cpu=None,
+limit_gpu=None,
+limit_ephemeral_storage=None
+):
 self.request_memory = request_memory
 self.request_cpu = request_cpu
 self.request_ephemeral_storage = request_ephemeral_storage
@@ -104,9 +112,10 @@ class Port(K8SModel):
 __slots__ = ('name', 'container_port')
 
 def __init__(
-self,
-name=None,
-container_port=None):
+self,
+name=None,
+container_port=None
+):
 """Creates port"""
 self.name = name
 self.container_port = container_port
diff --git a/airflow/kubernetes/pod_launcher.py 
b/airflow/kubernetes/pod_launcher.py
index d27a647..05df204 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream
 from requests.exceptions import BaseHTTPError
 
 from airflow import AirflowException
+from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
 from airflow.kubernetes.pod_generator import PodDefaults
-from airflow.settings import pod_mutation_hook
+from airflow 

[airflow] 20/32: Fixes PodMutationHook for backwards compatibility (#9903)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit bcd02ddb81a07026dcbbc5e5a4dc669a6483b59b
Author: Daniel Imberman 
AuthorDate: Thu Jul 30 11:40:23 2020 -0700

Fixes PodMutationHook for backwards compatibility (#9903)

Co-authored-by: Daniel Imberman 
Co-authored-by: Kaxil Naik 
---
 airflow/kubernetes/k8s_model.py  |  16 +++
 airflow/kubernetes/pod.py|  33 --
 airflow/kubernetes/pod_launcher.py   |  26 +++-
 airflow/kubernetes/pod_launcher_helper.py|  96 +++
 airflow/kubernetes/volume_mount.py   |   1 +
 airflow/kubernetes_deprecated/__init__.py|  16 +++
 airflow/kubernetes_deprecated/pod.py | 171 +++
 docs/conf.py |   1 +
 tests/kubernetes/models/test_pod.py  |  81 +
 tests/kubernetes/test_pod_launcher_helper.py |  97 +++
 tests/test_local_settings.py |  96 +++
 11 files changed, 619 insertions(+), 15 deletions(-)

diff --git a/airflow/kubernetes/k8s_model.py b/airflow/kubernetes/k8s_model.py
index 3fd2f9e..e10a946 100644
--- a/airflow/kubernetes/k8s_model.py
+++ b/airflow/kubernetes/k8s_model.py
@@ -29,6 +29,7 @@ else:
 
 
 class K8SModel(ABC):
+
 """
 These Airflow Kubernetes models are here for backwards compatibility
 reasons only. Ideally clients should use the kubernetes api
@@ -39,6 +40,7 @@ class K8SModel(ABC):
 can be avoided. All of these models implement the
 `attach_to_pod` method so that they integrate with the kubernetes client.
 """
+
 @abc.abstractmethod
 def attach_to_pod(self, pod):
 """
@@ -47,9 +49,23 @@ class K8SModel(ABC):
 :return: The pod with the object attached
 """
 
+def as_dict(self):
+res = {}
+if hasattr(self, "__slots__"):
+for s in self.__slots__:
+if hasattr(self, s):
+res[s] = getattr(self, s)
+if hasattr(self, "__dict__"):
+res_dict = self.__dict__.copy()
+res_dict.update(res)
+return res_dict
+return res
+
 
 def append_to_pod(pod, k8s_objects):
 """
+Attach Kubernetes objects to the given POD
+
 :param pod: A pod to attach a list of Kubernetes objects to
 :type pod: kubernetes.client.models.V1Pod
 :param k8s_objects: a potential None list of K8SModels
diff --git a/airflow/kubernetes/pod.py b/airflow/kubernetes/pod.py
index 0b332c2..9e455af 100644
--- a/airflow/kubernetes/pod.py
+++ b/airflow/kubernetes/pod.py
@@ -26,7 +26,13 @@ from airflow.kubernetes.k8s_model import K8SModel
 
 
 class Resources(K8SModel):
-__slots__ = ('request_memory', 'request_cpu', 'limit_memory', 'limit_cpu', 
'limit_gpu')
+__slots__ = ('request_memory',
+ 'request_cpu',
+ 'limit_memory',
+ 'limit_cpu',
+ 'limit_gpu',
+ 'request_ephemeral_storage',
+ 'limit_ephemeral_storage')
 
 """
 :param request_memory: requested memory
@@ -44,15 +50,17 @@ class Resources(K8SModel):
 :param limit_ephemeral_storage: Limit for ephemeral storage
 :type limit_ephemeral_storage: float | str
 """
+
 def __init__(
-self,
-request_memory=None,
-request_cpu=None,
-request_ephemeral_storage=None,
-limit_memory=None,
-limit_cpu=None,
-limit_gpu=None,
-limit_ephemeral_storage=None):
+self,
+request_memory=None,
+request_cpu=None,
+request_ephemeral_storage=None,
+limit_memory=None,
+limit_cpu=None,
+limit_gpu=None,
+limit_ephemeral_storage=None
+):
 self.request_memory = request_memory
 self.request_cpu = request_cpu
 self.request_ephemeral_storage = request_ephemeral_storage
@@ -104,9 +112,10 @@ class Port(K8SModel):
 __slots__ = ('name', 'container_port')
 
 def __init__(
-self,
-name=None,
-container_port=None):
+self,
+name=None,
+container_port=None
+):
 """Creates port"""
 self.name = name
 self.container_port = container_port
diff --git a/airflow/kubernetes/pod_launcher.py 
b/airflow/kubernetes/pod_launcher.py
index d27a647..05df204 100644
--- a/airflow/kubernetes/pod_launcher.py
+++ b/airflow/kubernetes/pod_launcher.py
@@ -26,10 +26,12 @@ from kubernetes.stream import stream as kubernetes_stream
 from requests.exceptions import BaseHTTPError
 
 from airflow import AirflowException
+from airflow.kubernetes.pod_launcher_helper import convert_to_airflow_pod
 from airflow.kubernetes.pod_generator import PodDefaults
-from airflow.settings import pod_mutation_hook
+from airflow