[airflow] 32/32: Makes multi-namespace mode optional (#9570)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619 Author: Daniel Imberman AuthorDate: Mon Aug 10 13:41:40 2020 -0700 Makes multi-namespace mode optional (#9570) Running the airflow k8sexecutor with multiple namespace abilities requires creating a ClusterRole which can break existing deployments Co-authored-by: Daniel Imberman (cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e) --- airflow/config_templates/config.yml | 7 ++ airflow/config_templates/default_airflow.cfg | 4 airflow/executors/kubernetes_executor.py | 32 +++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f54255e..75c47cb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1812,6 +1812,13 @@ type: string example: ~ default: "default" +- name: multi_namespace_mode + description: | +Allows users to launch pods in multiple namespaces. +Will require creating a cluster-role for the scheduler + type: boolean + example: ~ + default: "False" - name: airflow_configmap description: | The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e18e538..3a9bba2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` namespace = default +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler +multi_namespace_mode = False + # The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) # Example: airflow_configmap = airflow-configmap airflow_configmap = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3ad4222..7b31b45 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -22,6 +22,7 @@ KubernetesExecutor :ref:`executor:KubernetesExecutor` """ import base64 +import functools import json import multiprocessing import time @@ -162,6 +163,7 @@ class KubeConfig: # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, 'namespace') +self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode') # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to @@ -254,9 +256,17 @@ class KubeConfig: class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" -def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): + +def __init__(self, + namespace, + mult_namespace_mode, + watcher_queue, + resource_version, + worker_uuid, + kube_config): multiprocessing.Process.__init__(self) self.namespace = namespace +self.multi_namespace_mode = mult_namespace_mode self.worker_uuid = worker_uuid self.watcher_queue = watcher_queue self.resource_version = resource_version @@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): kwargs[key] = value last_resource_version = None -for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, -**kwargs): +if self.multi_namespace_mode: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_pod_for_all_namespaces, + **kwargs) +else: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_namespaced_pod, + self.namespace, + **kwargs) +for event in list_worker_pods(): task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -377,8 +395,12 @@
[airflow] 32/32: Makes multi-namespace mode optional (#9570)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619 Author: Daniel Imberman AuthorDate: Mon Aug 10 13:41:40 2020 -0700 Makes multi-namespace mode optional (#9570) Running the airflow k8sexecutor with multiple namespace abilities requires creating a ClusterRole which can break existing deployments Co-authored-by: Daniel Imberman (cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e) --- airflow/config_templates/config.yml | 7 ++ airflow/config_templates/default_airflow.cfg | 4 airflow/executors/kubernetes_executor.py | 32 +++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f54255e..75c47cb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1812,6 +1812,13 @@ type: string example: ~ default: "default" +- name: multi_namespace_mode + description: | +Allows users to launch pods in multiple namespaces. +Will require creating a cluster-role for the scheduler + type: boolean + example: ~ + default: "False" - name: airflow_configmap description: | The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e18e538..3a9bba2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` namespace = default +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler +multi_namespace_mode = False + # The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) # Example: airflow_configmap = airflow-configmap airflow_configmap = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3ad4222..7b31b45 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -22,6 +22,7 @@ KubernetesExecutor :ref:`executor:KubernetesExecutor` """ import base64 +import functools import json import multiprocessing import time @@ -162,6 +163,7 @@ class KubeConfig: # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, 'namespace') +self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode') # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to @@ -254,9 +256,17 @@ class KubeConfig: class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" -def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): + +def __init__(self, + namespace, + mult_namespace_mode, + watcher_queue, + resource_version, + worker_uuid, + kube_config): multiprocessing.Process.__init__(self) self.namespace = namespace +self.multi_namespace_mode = mult_namespace_mode self.worker_uuid = worker_uuid self.watcher_queue = watcher_queue self.resource_version = resource_version @@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): kwargs[key] = value last_resource_version = None -for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, -**kwargs): +if self.multi_namespace_mode: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_pod_for_all_namespaces, + **kwargs) +else: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_namespaced_pod, + self.namespace, + **kwargs) +for event in list_worker_pods(): task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -377,8 +395,12 @@
[airflow] 32/32: Makes multi-namespace mode optional (#9570)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619 Author: Daniel Imberman AuthorDate: Mon Aug 10 13:41:40 2020 -0700 Makes multi-namespace mode optional (#9570) Running the airflow k8sexecutor with multiple namespace abilities requires creating a ClusterRole which can break existing deployments Co-authored-by: Daniel Imberman (cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e) --- airflow/config_templates/config.yml | 7 ++ airflow/config_templates/default_airflow.cfg | 4 airflow/executors/kubernetes_executor.py | 32 +++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f54255e..75c47cb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1812,6 +1812,13 @@ type: string example: ~ default: "default" +- name: multi_namespace_mode + description: | +Allows users to launch pods in multiple namespaces. +Will require creating a cluster-role for the scheduler + type: boolean + example: ~ + default: "False" - name: airflow_configmap description: | The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e18e538..3a9bba2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` namespace = default +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler +multi_namespace_mode = False + # The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) # Example: airflow_configmap = airflow-configmap airflow_configmap = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3ad4222..7b31b45 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -22,6 +22,7 @@ KubernetesExecutor :ref:`executor:KubernetesExecutor` """ import base64 +import functools import json import multiprocessing import time @@ -162,6 +163,7 @@ class KubeConfig: # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, 'namespace') +self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode') # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to @@ -254,9 +256,17 @@ class KubeConfig: class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" -def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): + +def __init__(self, + namespace, + mult_namespace_mode, + watcher_queue, + resource_version, + worker_uuid, + kube_config): multiprocessing.Process.__init__(self) self.namespace = namespace +self.multi_namespace_mode = mult_namespace_mode self.worker_uuid = worker_uuid self.watcher_queue = watcher_queue self.resource_version = resource_version @@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): kwargs[key] = value last_resource_version = None -for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, -**kwargs): +if self.multi_namespace_mode: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_pod_for_all_namespaces, + **kwargs) +else: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_namespaced_pod, + self.namespace, + **kwargs) +for event in list_worker_pods(): task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -377,8 +395,12 @@
[airflow] 32/32: Makes multi-namespace mode optional (#9570)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619 Author: Daniel Imberman AuthorDate: Mon Aug 10 13:41:40 2020 -0700 Makes multi-namespace mode optional (#9570) Running the airflow k8sexecutor with multiple namespace abilities requires creating a ClusterRole which can break existing deployments Co-authored-by: Daniel Imberman (cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e) --- airflow/config_templates/config.yml | 7 ++ airflow/config_templates/default_airflow.cfg | 4 airflow/executors/kubernetes_executor.py | 32 +++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f54255e..75c47cb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1812,6 +1812,13 @@ type: string example: ~ default: "default" +- name: multi_namespace_mode + description: | +Allows users to launch pods in multiple namespaces. +Will require creating a cluster-role for the scheduler + type: boolean + example: ~ + default: "False" - name: airflow_configmap description: | The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e18e538..3a9bba2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` namespace = default +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler +multi_namespace_mode = False + # The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) # Example: airflow_configmap = airflow-configmap airflow_configmap = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3ad4222..7b31b45 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -22,6 +22,7 @@ KubernetesExecutor :ref:`executor:KubernetesExecutor` """ import base64 +import functools import json import multiprocessing import time @@ -162,6 +163,7 @@ class KubeConfig: # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, 'namespace') +self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode') # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to @@ -254,9 +256,17 @@ class KubeConfig: class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" -def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): + +def __init__(self, + namespace, + mult_namespace_mode, + watcher_queue, + resource_version, + worker_uuid, + kube_config): multiprocessing.Process.__init__(self) self.namespace = namespace +self.multi_namespace_mode = mult_namespace_mode self.worker_uuid = worker_uuid self.watcher_queue = watcher_queue self.resource_version = resource_version @@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): kwargs[key] = value last_resource_version = None -for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, -**kwargs): +if self.multi_namespace_mode: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_pod_for_all_namespaces, + **kwargs) +else: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_namespaced_pod, + self.namespace, + **kwargs) +for event in list_worker_pods(): task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -377,8 +395,12 @@
[airflow] 32/32: Makes multi-namespace mode optional (#9570)
This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v1-10-test in repository https://gitbox.apache.org/repos/asf/airflow.git commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619 Author: Daniel Imberman AuthorDate: Mon Aug 10 13:41:40 2020 -0700 Makes multi-namespace mode optional (#9570) Running the airflow k8sexecutor with multiple namespace abilities requires creating a ClusterRole which can break existing deployments Co-authored-by: Daniel Imberman (cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e) --- airflow/config_templates/config.yml | 7 ++ airflow/config_templates/default_airflow.cfg | 4 airflow/executors/kubernetes_executor.py | 32 +++- 3 files changed, 38 insertions(+), 5 deletions(-) diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index f54255e..75c47cb 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -1812,6 +1812,13 @@ type: string example: ~ default: "default" +- name: multi_namespace_mode + description: | +Allows users to launch pods in multiple namespaces. +Will require creating a cluster-role for the scheduler + type: boolean + example: ~ + default: "False" - name: airflow_configmap description: | The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index e18e538..3a9bba2 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1 # The Kubernetes namespace where airflow workers should be created. Defaults to ``default`` namespace = default +# Allows users to launch pods in multiple namespaces. +# Will require creating a cluster-role for the scheduler +multi_namespace_mode = False + # The name of the Kubernetes ConfigMap containing the Airflow Configuration (this file) # Example: airflow_configmap = airflow-configmap airflow_configmap = diff --git a/airflow/executors/kubernetes_executor.py b/airflow/executors/kubernetes_executor.py index 3ad4222..7b31b45 100644 --- a/airflow/executors/kubernetes_executor.py +++ b/airflow/executors/kubernetes_executor.py @@ -22,6 +22,7 @@ KubernetesExecutor :ref:`executor:KubernetesExecutor` """ import base64 +import functools import json import multiprocessing import time @@ -162,6 +163,7 @@ class KubeConfig: # cluster has RBAC enabled, your scheduler may need service account permissions to # create, watch, get, and delete pods in this namespace. self.kube_namespace = conf.get(self.kubernetes_section, 'namespace') +self.multi_namespace_mode = conf.get(self.kubernetes_section, 'multi_namespace_mode') # The Kubernetes Namespace in which pods will be created by the executor. Note # that if your # cluster has RBAC enabled, your workers may need service account permissions to @@ -254,9 +256,17 @@ class KubeConfig: class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs""" -def __init__(self, namespace, watcher_queue, resource_version, worker_uuid, kube_config): + +def __init__(self, + namespace, + mult_namespace_mode, + watcher_queue, + resource_version, + worker_uuid, + kube_config): multiprocessing.Process.__init__(self) self.namespace = namespace +self.multi_namespace_mode = mult_namespace_mode self.worker_uuid = worker_uuid self.watcher_queue = watcher_queue self.resource_version = resource_version @@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): kwargs[key] = value last_resource_version = None -for event in watcher.stream(kube_client.list_namespaced_pod, self.namespace, -**kwargs): +if self.multi_namespace_mode: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_pod_for_all_namespaces, + **kwargs) +else: +list_worker_pods = functools.partial(watcher.stream, + kube_client.list_namespaced_pod, + self.namespace, + **kwargs) +for event in list_worker_pods(): task = event['object'] self.log.info( 'Event: %s had an event of type %s', @@ -377,8 +395,12 @@