[airflow] 32/32: Makes multi-namespace mode optional (#9570)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619
Author: Daniel Imberman 
AuthorDate: Mon Aug 10 13:41:40 2020 -0700

Makes multi-namespace mode optional (#9570)

Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments

Co-authored-by: Daniel Imberman 
(cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
 airflow/config_templates/config.yml  |  7 ++
 airflow/config_templates/default_airflow.cfg |  4 
 airflow/executors/kubernetes_executor.py | 32 +++-
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
   type: string
   example: ~
   default: "default"
+- name: multi_namespace_mode
+  description: |
+Allows users to launch pods in multiple namespaces.
+Will require creating a cluster-role for the scheduler
+  type: boolean
+  example: ~
+  default: "False"
 - name: airflow_configmap
   description: |
 The name of the Kubernetes ConfigMap containing the Airflow 
Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
 # The Kubernetes namespace where airflow workers should be created. Defaults 
to ``default``
 namespace = default
 
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
 # The name of the Kubernetes ConfigMap containing the Airflow Configuration 
(this file)
 # Example: airflow_configmap = airflow-configmap
 airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
 :ref:`executor:KubernetesExecutor`
 """
 import base64
+import functools
 import json
 import multiprocessing
 import time
@@ -162,6 +163,7 @@ class KubeConfig:
 # cluster has RBAC enabled, your scheduler may need service account 
permissions to
 # create, watch, get, and delete pods in this namespace.
 self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+self.multi_namespace_mode = conf.get(self.kubernetes_section, 
'multi_namespace_mode')
 # The Kubernetes Namespace in which pods will be created by the 
executor. Note
 # that if your
 # cluster has RBAC enabled, your workers may need service account 
permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 """Watches for Kubernetes jobs"""
-def __init__(self, namespace, watcher_queue, resource_version, 
worker_uuid, kube_config):
+
+def __init__(self,
+ namespace,
+ mult_namespace_mode,
+ watcher_queue,
+ resource_version,
+ worker_uuid,
+ kube_config):
 multiprocessing.Process.__init__(self)
 self.namespace = namespace
+self.multi_namespace_mode = mult_namespace_mode
 self.worker_uuid = worker_uuid
 self.watcher_queue = watcher_queue
 self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 kwargs[key] = value
 
 last_resource_version = None
-for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace,
-**kwargs):
+if self.multi_namespace_mode:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_pod_for_all_namespaces,
+ **kwargs)
+else:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_namespaced_pod,
+ self.namespace,
+ **kwargs)
+for event in list_worker_pods():
 task = event['object']
 self.log.info(
 'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@ 

[airflow] 32/32: Makes multi-namespace mode optional (#9570)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619
Author: Daniel Imberman 
AuthorDate: Mon Aug 10 13:41:40 2020 -0700

Makes multi-namespace mode optional (#9570)

Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments

Co-authored-by: Daniel Imberman 
(cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
 airflow/config_templates/config.yml  |  7 ++
 airflow/config_templates/default_airflow.cfg |  4 
 airflow/executors/kubernetes_executor.py | 32 +++-
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
   type: string
   example: ~
   default: "default"
+- name: multi_namespace_mode
+  description: |
+Allows users to launch pods in multiple namespaces.
+Will require creating a cluster-role for the scheduler
+  type: boolean
+  example: ~
+  default: "False"
 - name: airflow_configmap
   description: |
 The name of the Kubernetes ConfigMap containing the Airflow 
Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
 # The Kubernetes namespace where airflow workers should be created. Defaults 
to ``default``
 namespace = default
 
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
 # The name of the Kubernetes ConfigMap containing the Airflow Configuration 
(this file)
 # Example: airflow_configmap = airflow-configmap
 airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
 :ref:`executor:KubernetesExecutor`
 """
 import base64
+import functools
 import json
 import multiprocessing
 import time
@@ -162,6 +163,7 @@ class KubeConfig:
 # cluster has RBAC enabled, your scheduler may need service account 
permissions to
 # create, watch, get, and delete pods in this namespace.
 self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+self.multi_namespace_mode = conf.get(self.kubernetes_section, 
'multi_namespace_mode')
 # The Kubernetes Namespace in which pods will be created by the 
executor. Note
 # that if your
 # cluster has RBAC enabled, your workers may need service account 
permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 """Watches for Kubernetes jobs"""
-def __init__(self, namespace, watcher_queue, resource_version, 
worker_uuid, kube_config):
+
+def __init__(self,
+ namespace,
+ mult_namespace_mode,
+ watcher_queue,
+ resource_version,
+ worker_uuid,
+ kube_config):
 multiprocessing.Process.__init__(self)
 self.namespace = namespace
+self.multi_namespace_mode = mult_namespace_mode
 self.worker_uuid = worker_uuid
 self.watcher_queue = watcher_queue
 self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 kwargs[key] = value
 
 last_resource_version = None
-for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace,
-**kwargs):
+if self.multi_namespace_mode:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_pod_for_all_namespaces,
+ **kwargs)
+else:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_namespaced_pod,
+ self.namespace,
+ **kwargs)
+for event in list_worker_pods():
 task = event['object']
 self.log.info(
 'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@ 

[airflow] 32/32: Makes multi-namespace mode optional (#9570)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619
Author: Daniel Imberman 
AuthorDate: Mon Aug 10 13:41:40 2020 -0700

Makes multi-namespace mode optional (#9570)

Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments

Co-authored-by: Daniel Imberman 
(cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
 airflow/config_templates/config.yml  |  7 ++
 airflow/config_templates/default_airflow.cfg |  4 
 airflow/executors/kubernetes_executor.py | 32 +++-
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
   type: string
   example: ~
   default: "default"
+- name: multi_namespace_mode
+  description: |
+Allows users to launch pods in multiple namespaces.
+Will require creating a cluster-role for the scheduler
+  type: boolean
+  example: ~
+  default: "False"
 - name: airflow_configmap
   description: |
 The name of the Kubernetes ConfigMap containing the Airflow 
Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
 # The Kubernetes namespace where airflow workers should be created. Defaults 
to ``default``
 namespace = default
 
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
 # The name of the Kubernetes ConfigMap containing the Airflow Configuration 
(this file)
 # Example: airflow_configmap = airflow-configmap
 airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
 :ref:`executor:KubernetesExecutor`
 """
 import base64
+import functools
 import json
 import multiprocessing
 import time
@@ -162,6 +163,7 @@ class KubeConfig:
 # cluster has RBAC enabled, your scheduler may need service account 
permissions to
 # create, watch, get, and delete pods in this namespace.
 self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+self.multi_namespace_mode = conf.get(self.kubernetes_section, 
'multi_namespace_mode')
 # The Kubernetes Namespace in which pods will be created by the 
executor. Note
 # that if your
 # cluster has RBAC enabled, your workers may need service account 
permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 """Watches for Kubernetes jobs"""
-def __init__(self, namespace, watcher_queue, resource_version, 
worker_uuid, kube_config):
+
+def __init__(self,
+ namespace,
+ mult_namespace_mode,
+ watcher_queue,
+ resource_version,
+ worker_uuid,
+ kube_config):
 multiprocessing.Process.__init__(self)
 self.namespace = namespace
+self.multi_namespace_mode = mult_namespace_mode
 self.worker_uuid = worker_uuid
 self.watcher_queue = watcher_queue
 self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 kwargs[key] = value
 
 last_resource_version = None
-for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace,
-**kwargs):
+if self.multi_namespace_mode:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_pod_for_all_namespaces,
+ **kwargs)
+else:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_namespaced_pod,
+ self.namespace,
+ **kwargs)
+for event in list_worker_pods():
 task = event['object']
 self.log.info(
 'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@ 

[airflow] 32/32: Makes multi-namespace mode optional (#9570)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619
Author: Daniel Imberman 
AuthorDate: Mon Aug 10 13:41:40 2020 -0700

Makes multi-namespace mode optional (#9570)

Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments

Co-authored-by: Daniel Imberman 
(cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
 airflow/config_templates/config.yml  |  7 ++
 airflow/config_templates/default_airflow.cfg |  4 
 airflow/executors/kubernetes_executor.py | 32 +++-
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
   type: string
   example: ~
   default: "default"
+- name: multi_namespace_mode
+  description: |
+Allows users to launch pods in multiple namespaces.
+Will require creating a cluster-role for the scheduler
+  type: boolean
+  example: ~
+  default: "False"
 - name: airflow_configmap
   description: |
 The name of the Kubernetes ConfigMap containing the Airflow 
Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
 # The Kubernetes namespace where airflow workers should be created. Defaults 
to ``default``
 namespace = default
 
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
 # The name of the Kubernetes ConfigMap containing the Airflow Configuration 
(this file)
 # Example: airflow_configmap = airflow-configmap
 airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
 :ref:`executor:KubernetesExecutor`
 """
 import base64
+import functools
 import json
 import multiprocessing
 import time
@@ -162,6 +163,7 @@ class KubeConfig:
 # cluster has RBAC enabled, your scheduler may need service account 
permissions to
 # create, watch, get, and delete pods in this namespace.
 self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+self.multi_namespace_mode = conf.get(self.kubernetes_section, 
'multi_namespace_mode')
 # The Kubernetes Namespace in which pods will be created by the 
executor. Note
 # that if your
 # cluster has RBAC enabled, your workers may need service account 
permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 """Watches for Kubernetes jobs"""
-def __init__(self, namespace, watcher_queue, resource_version, 
worker_uuid, kube_config):
+
+def __init__(self,
+ namespace,
+ mult_namespace_mode,
+ watcher_queue,
+ resource_version,
+ worker_uuid,
+ kube_config):
 multiprocessing.Process.__init__(self)
 self.namespace = namespace
+self.multi_namespace_mode = mult_namespace_mode
 self.worker_uuid = worker_uuid
 self.watcher_queue = watcher_queue
 self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 kwargs[key] = value
 
 last_resource_version = None
-for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace,
-**kwargs):
+if self.multi_namespace_mode:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_pod_for_all_namespaces,
+ **kwargs)
+else:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_namespaced_pod,
+ self.namespace,
+ **kwargs)
+for event in list_worker_pods():
 task = event['object']
 self.log.info(
 'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@ 

[airflow] 32/32: Makes multi-namespace mode optional (#9570)

2020-08-11 Thread kaxilnaik
This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch v1-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 242d6d0e9a1955b11677d2be1b7ae5e28243e619
Author: Daniel Imberman 
AuthorDate: Mon Aug 10 13:41:40 2020 -0700

Makes multi-namespace mode optional (#9570)

Running the airflow k8sexecutor with multiple namespace abilities
requires creating a ClusterRole which can break existing deployments

Co-authored-by: Daniel Imberman 
(cherry picked from commit 2e3c878066f9241d17f2e4ba41fe0e2ba02de79e)
---
 airflow/config_templates/config.yml  |  7 ++
 airflow/config_templates/default_airflow.cfg |  4 
 airflow/executors/kubernetes_executor.py | 32 +++-
 3 files changed, 38 insertions(+), 5 deletions(-)

diff --git a/airflow/config_templates/config.yml 
b/airflow/config_templates/config.yml
index f54255e..75c47cb 100644
--- a/airflow/config_templates/config.yml
+++ b/airflow/config_templates/config.yml
@@ -1812,6 +1812,13 @@
   type: string
   example: ~
   default: "default"
+- name: multi_namespace_mode
+  description: |
+Allows users to launch pods in multiple namespaces.
+Will require creating a cluster-role for the scheduler
+  type: boolean
+  example: ~
+  default: "False"
 - name: airflow_configmap
   description: |
 The name of the Kubernetes ConfigMap containing the Airflow 
Configuration (this file)
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index e18e538..3a9bba2 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -838,6 +838,10 @@ worker_pods_creation_batch_size = 1
 # The Kubernetes namespace where airflow workers should be created. Defaults 
to ``default``
 namespace = default
 
+# Allows users to launch pods in multiple namespaces.
+# Will require creating a cluster-role for the scheduler
+multi_namespace_mode = False
+
 # The name of the Kubernetes ConfigMap containing the Airflow Configuration 
(this file)
 # Example: airflow_configmap = airflow-configmap
 airflow_configmap =
diff --git a/airflow/executors/kubernetes_executor.py 
b/airflow/executors/kubernetes_executor.py
index 3ad4222..7b31b45 100644
--- a/airflow/executors/kubernetes_executor.py
+++ b/airflow/executors/kubernetes_executor.py
@@ -22,6 +22,7 @@ KubernetesExecutor
 :ref:`executor:KubernetesExecutor`
 """
 import base64
+import functools
 import json
 import multiprocessing
 import time
@@ -162,6 +163,7 @@ class KubeConfig:
 # cluster has RBAC enabled, your scheduler may need service account 
permissions to
 # create, watch, get, and delete pods in this namespace.
 self.kube_namespace = conf.get(self.kubernetes_section, 'namespace')
+self.multi_namespace_mode = conf.get(self.kubernetes_section, 
'multi_namespace_mode')
 # The Kubernetes Namespace in which pods will be created by the 
executor. Note
 # that if your
 # cluster has RBAC enabled, your workers may need service account 
permissions to
@@ -254,9 +256,17 @@ class KubeConfig:
 
 class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
 """Watches for Kubernetes jobs"""
-def __init__(self, namespace, watcher_queue, resource_version, 
worker_uuid, kube_config):
+
+def __init__(self,
+ namespace,
+ mult_namespace_mode,
+ watcher_queue,
+ resource_version,
+ worker_uuid,
+ kube_config):
 multiprocessing.Process.__init__(self)
 self.namespace = namespace
+self.multi_namespace_mode = mult_namespace_mode
 self.worker_uuid = worker_uuid
 self.watcher_queue = watcher_queue
 self.resource_version = resource_version
@@ -295,8 +305,16 @@ class KubernetesJobWatcher(multiprocessing.Process, 
LoggingMixin):
 kwargs[key] = value
 
 last_resource_version = None
-for event in watcher.stream(kube_client.list_namespaced_pod, 
self.namespace,
-**kwargs):
+if self.multi_namespace_mode:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_pod_for_all_namespaces,
+ **kwargs)
+else:
+list_worker_pods = functools.partial(watcher.stream,
+ 
kube_client.list_namespaced_pod,
+ self.namespace,
+ **kwargs)
+for event in list_worker_pods():
 task = event['object']
 self.log.info(
 'Event: %s had an event of type %s',
@@ -377,8 +395,12 @@