Repository: incubator-airflow Updated Branches: refs/heads/master 683a27f2c -> 22453d037
[AIRFLOW-1908] Fix celery broker options config load Options were set to visibility timeout instead of broker_options directly. Furthermore, options should be int, float, bool or string not all string. Closes #2867 from bolkedebruin/AIRFLOW-1908 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/22453d03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/22453d03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/22453d03 Branch: refs/heads/master Commit: 22453d037ec69b3e5ab1cda4717a3dea9c47df56 Parents: 683a27f Author: Bolke de Bruin <bo...@xs4all.nl> Authored: Tue Dec 12 12:44:06 2017 +0100 Committer: Bolke de Bruin <bo...@xs4all.nl> Committed: Tue Dec 12 12:44:06 2017 +0100 ---------------------------------------------------------------------- airflow/config_templates/default_celery.py | 2 +- airflow/configuration.py | 23 ++++++++++++++++++++++- scripts/ci/airflow_travis.cfg | 6 ++++++ tests/configuration.py | 14 +++++++++++++- 4 files changed, 42 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/config_templates/default_celery.py ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 3309cbe..57b9611 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -32,7 +32,7 @@ DEFAULT_CELERY_CONFIG = { 'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'), 'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'), 'broker_url': configuration.get('celery', 'BROKER_URL'), - 'broker_transport_options': {'visibility_timeout': broker_transport_options}, + 'broker_transport_options': broker_transport_options, 'result_backend': configuration.get('celery', 'RESULT_BACKEND'), 'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'), } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index ed63952..2bb2a49 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -28,6 +28,8 @@ import sys from future import standard_library +from six import iteritems + from airflow.utils.log.logging_mixin import LoggingMixin standard_library.install_aliases() @@ -237,8 +239,27 @@ class AirflowConfigParser(ConfigParser): self._validate() def getsection(self, section): + """ + Returns the section as a dict. Values are converted to int, float, bool + as required. + :param section: section from the config + :return: dict + """ if section in self._sections: - return self._sections[section] + _section = self._sections[section] + for key, val in iteritems(self._sections[section]): + try: + val = int(val) + except ValueError: + try: + val = float(val) + except ValueError: + if val.lower() in ('t', 'true'): + val = True + elif val.lower() in ('f', 'false'): + val = False + _section[key] = val + return _section return None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/scripts/ci/airflow_travis.cfg ---------------------------------------------------------------------- diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index b71947e..ee29148 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -50,6 +50,12 @@ result_backend = db+mysql://root@localhost/airflow flower_port = 5555 default_queue = default +[celery_broker_transport_options] +visibility_timeout = 21600 +_test_only_bool = True +_test_only_float = 12.0 +_test_only_string = this is a test + [scheduler] job_heartbeat_sec = 1 scheduler_heartbeat_sec = 5 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/tests/configuration.py ---------------------------------------------------------------------- diff --git a/tests/configuration.py b/tests/configuration.py index bb0fd17..300205b 100644 --- a/tests/configuration.py +++ b/tests/configuration.py @@ -13,12 +13,14 @@ # limitations under the License. from __future__ import print_function -import os import unittest +import six + from airflow import configuration from airflow.configuration import conf + class ConfTest(unittest.TestCase): def setup(self): @@ -52,3 +54,13 @@ class ConfTest(unittest.TestCase): cfg_dict = conf.as_dict(display_sensitive=True, display_source=True) self.assertEqual( cfg_dict['testsection']['testkey'], ('testvalue', 'env var')) + + def test_broker_transport_options(self): + section_dict = conf.getsection("celery_broker_transport_options") + self.assertTrue(isinstance(section_dict['visibility_timeout'], int)) + + self.assertTrue(isinstance(section_dict['_test_only_bool'], bool)) + + self.assertTrue(isinstance(section_dict['_test_only_float'], float)) + + self.assertTrue(isinstance(section_dict['_test_only_string'], six.string_types))