Fokko closed pull request #3612: [AIRFLOW-2755] Added `kubernetes.worker_dags_folder` configuration URL: https://github.com/apache/incubator-airflow/pull/3612
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index be286ea3dc..d4a7242118 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -562,6 +562,7 @@ elasticsearch_end_of_log_mark = end_of_log worker_container_repository = worker_container_tag = worker_container_image_pull_policy = IfNotPresent +worker_dags_folder = # If True (default), worker pods will be deleted upon termination delete_worker_pods = True diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index 788d925c38..66c600ba65 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -115,6 +115,8 @@ def __init__(self): self.kubernetes_section, 'worker_container_repository') self.worker_container_tag = configuration.get( self.kubernetes_section, 'worker_container_tag') + self.worker_dags_folder = configuration.get( + self.kubernetes_section, 'worker_dags_folder') self.kube_image = '{}:{}'.format( self.worker_container_repository, self.worker_container_tag) self.kube_image_pull_policy = configuration.get( diff --git a/airflow/contrib/kubernetes/worker_configuration.py b/airflow/contrib/kubernetes/worker_configuration.py index c9f86b047a..88a5cf0a40 100644 --- a/airflow/contrib/kubernetes/worker_configuration.py +++ b/airflow/contrib/kubernetes/worker_configuration.py @@ -81,12 +81,14 @@ def _get_init_containers(self, volume_mounts): def _get_environment(self): """Defines any necessary environment variables for the pod executor""" env = { - 'AIRFLOW__CORE__DAGS_FOLDER': '/tmp/dags', - 'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor', - 'AIRFLOW__CORE__SQL_ALCHEMY_CONN': conf.get('core', 'SQL_ALCHEMY_CONN') + "AIRFLOW__CORE__EXECUTOR": "LocalExecutor", + "AIRFLOW__CORE__SQL_ALCHEMY_CONN": conf.get("core", "SQL_ALCHEMY_CONN"), } + if self.kube_config.airflow_configmap: env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home + if self.kube_config.worker_dags_folder: + env['AIRFLOW__CORE__DAGS_FOLDER'] = self.kube_config.worker_dags_folder return env def _get_secrets(self): diff --git a/scripts/ci/kubernetes/kube/configmaps.yaml b/scripts/ci/kubernetes/kube/configmaps.yaml index 97556bf840..3e64ae4e47 100644 --- a/scripts/ci/kubernetes/kube/configmaps.yaml +++ b/scripts/ci/kubernetes/kube/configmaps.yaml @@ -179,6 +179,7 @@ data: worker_container_repository = airflow worker_container_tag = latest worker_container_image_pull_policy = IfNotPresent + worker_dags_folder = /tmp/dags delete_worker_pods = True git_repo = https://github.com/apache/incubator-airflow.git git_branch = master diff --git a/tests/contrib/executors/test_kubernetes_executor.py b/tests/contrib/executors/test_kubernetes_executor.py index 963efcb03b..d9da48ce3b 100644 --- a/tests/contrib/executors/test_kubernetes_executor.py +++ b/tests/contrib/executors/test_kubernetes_executor.py @@ -133,6 +133,22 @@ def test_worker_with_subpaths(self): "subPath should've been passed to volumeMount configuration" ) + def test_worker_environment_no_dags_folder(self): + self.kube_config.worker_dags_folder = '' + worker_config = WorkerConfiguration(self.kube_config) + env = worker_config._get_environment() + + self.assertNotIn('AIRFLOW__CORE__DAGS_FOLDER', env) + + def test_worker_environment_when_dags_folder_specified(self): + dags_folder = '/workers/path/to/dags' + self.kube_config.worker_dags_folder = dags_folder + + worker_config = WorkerConfiguration(self.kube_config) + env = worker_config._get_environment() + + self.assertEqual(dags_folder, env['AIRFLOW__CORE__DAGS_FOLDER']) + if __name__ == '__main__': unittest.main() ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services