[
https://issues.apache.org/jira/browse/AIRFLOW-2755?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16570751#comment-16570751
]
ASF GitHub Bot commented on AIRFLOW-2755:
-
Fokko closed pull request #3612: [AIRFLOW-2755] Added
`kubernetes.worker_dags_folder` configuration
URL: https://github.com/apache/incubator-airflow/pull/3612
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/airflow/config_templates/default_airflow.cfg
b/airflow/config_templates/default_airflow.cfg
index be286ea3dc..d4a7242118 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -562,6 +562,7 @@ elasticsearch_end_of_log_mark = end_of_log
worker_container_repository =
worker_container_tag =
worker_container_image_pull_policy = IfNotPresent
+worker_dags_folder =
# If True (default), worker pods will be deleted upon termination
delete_worker_pods = True
diff --git a/airflow/contrib/executors/kubernetes_executor.py
b/airflow/contrib/executors/kubernetes_executor.py
index 788d925c38..66c600ba65 100644
--- a/airflow/contrib/executors/kubernetes_executor.py
+++ b/airflow/contrib/executors/kubernetes_executor.py
@@ -115,6 +115,8 @@ def __init__(self):
self.kubernetes_section, 'worker_container_repository')
self.worker_container_tag = configuration.get(
self.kubernetes_section, 'worker_container_tag')
+self.worker_dags_folder = configuration.get(
+self.kubernetes_section, 'worker_dags_folder')
self.kube_image = '{}:{}'.format(
self.worker_container_repository, self.worker_container_tag)
self.kube_image_pull_policy = configuration.get(
diff --git a/airflow/contrib/kubernetes/worker_configuration.py
b/airflow/contrib/kubernetes/worker_configuration.py
index c9f86b047a..88a5cf0a40 100644
--- a/airflow/contrib/kubernetes/worker_configuration.py
+++ b/airflow/contrib/kubernetes/worker_configuration.py
@@ -81,12 +81,14 @@ def _get_init_containers(self, volume_mounts):
def _get_environment(self):
"""Defines any necessary environment variables for the pod executor"""
env = {
-'AIRFLOW__CORE__DAGS_FOLDER': '/tmp/dags',
-'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor',
-'AIRFLOW__CORE__SQL_ALCHEMY_CONN': conf.get('core',
'SQL_ALCHEMY_CONN')
+"AIRFLOW__CORE__EXECUTOR": "LocalExecutor",
+"AIRFLOW__CORE__SQL_ALCHEMY_CONN": conf.get("core",
"SQL_ALCHEMY_CONN"),
}
+
if self.kube_config.airflow_configmap:
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
+if self.kube_config.worker_dags_folder:
+env['AIRFLOW__CORE__DAGS_FOLDER'] =
self.kube_config.worker_dags_folder
return env
def _get_secrets(self):
diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml
b/scripts/ci/kubernetes/kube/configmaps.yaml
index 97556bf840..3e64ae4e47 100644
--- a/scripts/ci/kubernetes/kube/configmaps.yaml
+++ b/scripts/ci/kubernetes/kube/configmaps.yaml
@@ -179,6 +179,7 @@ data:
worker_container_repository = airflow
worker_container_tag = latest
worker_container_image_pull_policy = IfNotPresent
+worker_dags_folder = /tmp/dags
delete_worker_pods = True
git_repo = https://github.com/apache/incubator-airflow.git
git_branch = master
diff --git a/tests/contrib/executors/test_kubernetes_executor.py
b/tests/contrib/executors/test_kubernetes_executor.py
index 963efcb03b..d9da48ce3b 100644
--- a/tests/contrib/executors/test_kubernetes_executor.py
+++ b/tests/contrib/executors/test_kubernetes_executor.py
@@ -133,6 +133,22 @@ def test_worker_with_subpaths(self):
"subPath should've been passed to volumeMount
configuration"
)
+def test_worker_environment_no_dags_folder(self):
+self.kube_config.worker_dags_folder = ''
+worker_config = WorkerConfiguration(self.kube_config)
+env = worker_config._get_environment()
+
+self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env)
+
+def test_worker_environment_when_dags_folder_specified(self):
+dags_folder = '/workers/path/to/dags'
+self.kube_config.worker_dags_folder = dags_folder
+
+worker_config = WorkerConfiguration(self.kube_config)
+env = worker_config._get_environment()
+
+self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER'])
+
if __name__ == '__main__':
unittest.main()
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above