[jira] [Commented] (AIRFLOW-944) Docker operator does not work with Docker >= 1.19
[ https://issues.apache.org/jira/browse/AIRFLOW-944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288739#comment-16288739 ] Russell Pierce commented on AIRFLOW-944: Potentially (at least partially) duplicated by Airflow-1057, Airflow-1159, and Airflow-1380 > Docker operator does not work with Docker >= 1.19 > - > > Key: AIRFLOW-944 > URL: https://issues.apache.org/jira/browse/AIRFLOW-944 > Project: Apache Airflow > Issue Type: Bug > Components: docker >Affects Versions: Airflow 1.7.1.3, 1.8.0rc4 > Environment: Ubuntu 16.04 >Reporter: Ludovic Claude > > Docker operator does not work when mem_limit is set and Docker version 1.19 > or more recent is used. > Here are the logs, I have seen this issue with Airflow 1.7.1.3 and Airflow > 1.8.0 rc4. > [2017-03-06 11:37:54,895] {base_task_runner.py:95} INFO - Subtask: > [2017-03-06 11:37:54,895] {docker_operator.py:132} INFO - Starting docker > container from image hbpmip/mipmap > [2017-03-06 11:37:54,903] {base_task_runner.py:95} INFO - Subtask: > [2017-03-06 11:37:54,902] {models.py:1417} ERROR - mem_limit has been moved to > host_config in API version 1.19 > [2017-03-06 11:37:54,903] {base_task_runner.py:95} INFO - Subtask: Traceback > (most recent call last): > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: File > "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1369, in run > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: result > = task_copy.execute(context=context) > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: File > "/tmp/src/airflow-imaging-plugins/airflow_pipeline/operators/docker_pipeline_operator.py", > line 191, in execute > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: logs = > super(DockerPipelineOperator, self).execute(context) > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: File > "/usr/local/lib/python3.5/dist-packages/airflow/operators/docker_operator.py", > line 172, in execute > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: > user=self.user > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: File > "/usr/local/lib/python3.5/dist-packages/docker/api/container.py", line 133, > in create_container > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: > volume_driver, stop_signal, networking_config, > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: File > "/usr/local/lib/python3.5/dist-packages/docker/api/container.py", line 138, > in create_container_config > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: return > utils.create_container_config(self._version, *args, **kwargs) > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: File > "/usr/local/lib/python3.5/dist-packages/docker/utils/utils.py", line 1041, in > create_container_config > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: > 'mem_limit has been moved to host_config in API version 1.19' > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: > docker.errors.InvalidVersion: mem_limit has been moved to host_config in API > version 1.19 > [2017-03-06 11:37:54,904] {base_task_runner.py:95} INFO - Subtask: > [2017-03-06 11:37:54,903] {models.py:1433} INFO - Marking task as UP_FOR_RETRY > [2017-03-06 11:37:54,912] {base_task_runner.py:95} INFO - Subtask: > [2017-03-06 11:37:54,912] {models.py:1462} ERROR - mem_limit has been moved > to host_config in API version 1.19 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1915) flask-wtf version specification is too tight
William Pursell created AIRFLOW-1915: Summary: flask-wtf version specification is too tight Key: AIRFLOW-1915 URL: https://issues.apache.org/jira/browse/AIRFLOW-1915 Project: Apache Airflow Issue Type: Wish Components: core Reporter: William Pursell Assignee: William Pursell Priority: Minor airflow currently specifies flask-wtf 0.14. flask-application-builder requires 0.14.2. The discrepancy causes the version information to be unavailable from the web page when using the fab-based webserver. This seems unnecessary, and can be fixed by relaxing the version constraint in airflow. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AIRFLOW-1912) The log facility airflow.processor should not propagate
[ https://issues.apache.org/jira/browse/AIRFLOW-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1912. - Resolution: Fixed Issue resolved by pull request #2871 [https://github.com/apache/incubator-airflow/pull/2871] > The log facility airflow.processor should not propagate > --- > > Key: AIRFLOW-1912 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1912 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Bolke de Bruin >Priority: Blocker > Fix For: 1.9.1 > > > The root logger will write to stdout. If redirection is used which is the > case for processors and task runs (not runners) this can end up in an endless > loop in case propagation is True. > airflow.processor should not propagate therefore. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1912) The log facility airflow.processor should not propagate
[ https://issues.apache.org/jira/browse/AIRFLOW-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288351#comment-16288351 ] ASF subversion and git services commented on AIRFLOW-1912: -- Commit 1d7fac6e049c19ddd0866aca3fad712fce94b4dc in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=1d7fac6 ] [AIRFLOW-1912] airflow.processor should not propagate logging The root logger will write to stdout. If redirection is used which is the case for processors and task runs (not runners) this can end up in an endless loop in case propagation is True. Closes #2871 from bolkedebruin/AIRFLOW-1912 > The log facility airflow.processor should not propagate > --- > > Key: AIRFLOW-1912 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1912 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Bolke de Bruin >Priority: Blocker > Fix For: 1.9.1 > > > The root logger will write to stdout. If redirection is used which is the > case for processors and task runs (not runners) this can end up in an endless > loop in case propagation is True. > airflow.processor should not propagate therefore. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1912) The log facility airflow.processor should not propagate
[ https://issues.apache.org/jira/browse/AIRFLOW-1912?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16288350#comment-16288350 ] ASF subversion and git services commented on AIRFLOW-1912: -- Commit 1d7fac6e049c19ddd0866aca3fad712fce94b4dc in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=1d7fac6 ] [AIRFLOW-1912] airflow.processor should not propagate logging The root logger will write to stdout. If redirection is used which is the case for processors and task runs (not runners) this can end up in an endless loop in case propagation is True. Closes #2871 from bolkedebruin/AIRFLOW-1912 > The log facility airflow.processor should not propagate > --- > > Key: AIRFLOW-1912 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1912 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Reporter: Bolke de Bruin >Priority: Blocker > Fix For: 1.9.1 > > > The root logger will write to stdout. If redirection is used which is the > case for processors and task runs (not runners) this can end up in an endless > loop in case propagation is True. > airflow.processor should not propagate therefore. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1912] airflow.processor should not propagate logging
Repository: incubator-airflow Updated Branches: refs/heads/master 815270bb5 -> 1d7fac6e0 [AIRFLOW-1912] airflow.processor should not propagate logging The root logger will write to stdout. If redirection is used which is the case for processors and task runs (not runners) this can end up in an endless loop in case propagation is True. Closes #2871 from bolkedebruin/AIRFLOW-1912 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1d7fac6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1d7fac6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1d7fac6e Branch: refs/heads/master Commit: 1d7fac6e049c19ddd0866aca3fad712fce94b4dc Parents: 815270b Author: Bolke de BruinAuthored: Tue Dec 12 23:05:47 2017 +0100 Committer: Bolke de Bruin Committed: Tue Dec 12 23:05:47 2017 +0100 -- airflow/config_templates/airflow_local_settings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1d7fac6e/airflow/config_templates/airflow_local_settings.py -- diff --git a/airflow/config_templates/airflow_local_settings.py b/airflow/config_templates/airflow_local_settings.py index aa5b8da..e5f4198 100644 --- a/airflow/config_templates/airflow_local_settings.py +++ b/airflow/config_templates/airflow_local_settings.py @@ -90,7 +90,7 @@ DEFAULT_LOGGING_CONFIG = { 'airflow.processor': { 'handlers': ['file.processor'], 'level': LOG_LEVEL, -'propagate': True, +'propagate': False, }, 'airflow.task': { 'handlers': ['file.task'],
[jira] [Created] (AIRFLOW-1914) email utils in airflow in airflow does not support multibyte string content
edward created AIRFLOW-1914: --- Summary: email utils in airflow in airflow does not support multibyte string content Key: AIRFLOW-1914 URL: https://issues.apache.org/jira/browse/AIRFLOW-1914 Project: Apache Airflow Issue Type: Bug Components: operators Affects Versions: Airflow 2.0 Reporter: edward The built-in email utils does not support multibyte string content, for example, Japanese. issue in file: https://github.com/apache/incubator-airflow/blob/master/airflow/utils/email.py#L73 >>> mime_text = MIMEText(html_content, 'html') The charset is not passed in and the default charset is "us-ascii" The fix is to pass in the right charset base on html_context: >>> mime_text = MIMEText(html_content, 'html’, “us-ascii" if >>> isinstance(html_content, str) else "utf-8”) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (AIRFLOW-1889) Move the sensors to a separate package
[ https://issues.apache.org/jira/browse/AIRFLOW-1889?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andy Hadjigeorgiou reassigned AIRFLOW-1889: --- Assignee: Fokko Driesprong (was: Andy Hadjigeorgiou) > Move the sensors to a separate package > -- > > Key: AIRFLOW-1889 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1889 > Project: Apache Airflow > Issue Type: Bug >Reporter: Fokko Driesprong >Assignee: Fokko Driesprong > > Right now all the sensors are in a single files which clutters up the > codebase. By moving the sensors to a separate package the list of available > sensors will be more transparant and easier to maintain. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1913) Add Delete method for GCP Pub/Sub Topics and Create/Delete for Subscriptions
Jason Prodonovich created AIRFLOW-1913: -- Summary: Add Delete method for GCP Pub/Sub Topics and Create/Delete for Subscriptions Key: AIRFLOW-1913 URL: https://issues.apache.org/jira/browse/AIRFLOW-1913 Project: Apache Airflow Issue Type: Improvement Components: contrib, gcp, hooks Affects Versions: Airflow 1.8 Reporter: Jason Prodonovich Assignee: Jason Prodonovich There have been requests for Operators that pull messages from Google Cloud Pub/Sub and then act upon the receipt of those messages. In order to facilitate that, additional capabilities must be added to the PubSubHook class that allow for creating/deleting Subscriptions and adding the ability to delete a Topic from a project as well. These capabilities allow for Workflows and end-to-end tests to perform all of the necessary setup and cleanup when creating, publishing, receiving, and removing messages to and from Cloud PubSub topics. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1911) Update Airflow config
[ https://issues.apache.org/jira/browse/AIRFLOW-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287546#comment-16287546 ] ASF subversion and git services commented on AIRFLOW-1911: -- Commit 815270bb56255e0e0653f7bbfeb7d34d2e8c780b in incubator-airflow's branch refs/heads/master from [~Fokko] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=815270b ] [AIRFLOW-1911] Rename celeryd_concurrency There are still celeryd_concurrency occurrences left in the code this needs to be renamed to worker_concurrency to make the config with Celery consistent Closes #2870 from Fokko/AIRFLOW-1911-update- airflow-config > Update Airflow config > - > > Key: AIRFLOW-1911 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1911 > Project: Apache Airflow > Issue Type: Bug >Reporter: Fokko Driesprong > Fix For: 1.9.1 > > > Based on Joy's observation's there are still some celeryd_concurrency > occurrences left in the code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AIRFLOW-1911) Update Airflow config
[ https://issues.apache.org/jira/browse/AIRFLOW-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-1911. --- Resolution: Fixed Fix Version/s: 1.9.1 Issue resolved by pull request #2870 [https://github.com/apache/incubator-airflow/pull/2870] > Update Airflow config > - > > Key: AIRFLOW-1911 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1911 > Project: Apache Airflow > Issue Type: Bug >Reporter: Fokko Driesprong > Fix For: 1.9.1 > > > Based on Joy's observation's there are still some celeryd_concurrency > occurrences left in the code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1911) Update Airflow config
[ https://issues.apache.org/jira/browse/AIRFLOW-1911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287547#comment-16287547 ] ASF subversion and git services commented on AIRFLOW-1911: -- Commit 815270bb56255e0e0653f7bbfeb7d34d2e8c780b in incubator-airflow's branch refs/heads/master from [~Fokko] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=815270b ] [AIRFLOW-1911] Rename celeryd_concurrency There are still celeryd_concurrency occurrences left in the code this needs to be renamed to worker_concurrency to make the config with Celery consistent Closes #2870 from Fokko/AIRFLOW-1911-update- airflow-config > Update Airflow config > - > > Key: AIRFLOW-1911 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1911 > Project: Apache Airflow > Issue Type: Bug >Reporter: Fokko Driesprong > Fix For: 1.9.1 > > > Based on Joy's observation's there are still some celeryd_concurrency > occurrences left in the code. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1911] Rename celeryd_concurrency
Repository: incubator-airflow Updated Branches: refs/heads/master be54f0485 -> 815270bb5 [AIRFLOW-1911] Rename celeryd_concurrency There are still celeryd_concurrency occurrences left in the code this needs to be renamed to worker_concurrency to make the config with Celery consistent Closes #2870 from Fokko/AIRFLOW-1911-update- airflow-config Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/815270bb Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/815270bb Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/815270bb Branch: refs/heads/master Commit: 815270bb56255e0e0653f7bbfeb7d34d2e8c780b Parents: be54f04 Author: Fokko DriesprongAuthored: Tue Dec 12 13:47:55 2017 +0100 Committer: Fokko Driesprong Committed: Tue Dec 12 13:47:55 2017 +0100 -- airflow/bin/cli.py| 2 +- airflow/config_templates/default_test.cfg | 4 ++-- docs/configuration.rst| 4 ++-- scripts/ci/airflow_travis.cfg | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/815270bb/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 812977b..3e954dc 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -1394,7 +1394,7 @@ class CLIFactory(object): ("-c", "--concurrency"), type=int, help="The number of worker processes", -default=conf.get('celery', 'celeryd_concurrency')), +default=conf.get('celery', 'worker_concurrency')), 'celery_hostname': Arg( ("-cn", "--celery_hostname"), help=("Set the hostname of celery worker " http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/815270bb/airflow/config_templates/default_test.cfg -- diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index b065313..1e8a7df 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -72,10 +72,10 @@ smtp_mail_from = airf...@example.com [celery] celery_app_name = airflow.executors.celery_executor -celeryd_concurrency = 16 +worker_concurrency = 16 worker_log_server_port = 8793 broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow -celery_result_backend = db+mysql://airflow:airflow@localhost:3306/airflow +result_backend = db+mysql://airflow:airflow@localhost:3306/airflow flower_host = 0.0.0.0 flower_port = default_queue = default http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/815270bb/docs/configuration.rst -- diff --git a/docs/configuration.rst b/docs/configuration.rst index 51984e0..61f5511 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -35,7 +35,7 @@ You can also derive the connection string at run time by appending ``_cmd`` to t [core] sql_alchemy_conn_cmd = bash_command_to_run --But only three such configuration elements namely sql_alchemy_conn, broker_url and result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows - +-But only three such configuration elements namely sql_alchemy_conn, broker_url and result_backend can be fetched as a command. The idea behind this is to not store passwords on boxes in plain text files. The order of precedence is as follows - 1. environment variable 2. configuration in airflow.cfg @@ -159,7 +159,7 @@ Some caveats: - Make sure to use a database backed result backend - Make sure to set a visibility timeout in [celery_broker_transport_options] that exceeds the ETA of your longest running task -- Tasks can and consume resources, make sure your worker as enough resources to run `celeryd_concurrency` tasks +- Tasks can and consume resources, make sure your worker as enough resources to run `worker_concurrency` tasks Scaling Out with Dask ' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/815270bb/scripts/ci/airflow_travis.cfg -- diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index ee29148..c1ced74 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -43,7 +43,7 @@ smtp_mail_from = airf...@example.com [celery] celery_app_name = airflow.executors.celery_executor -celeryd_concurrency = 16
[jira] [Resolved] (AIRFLOW-1885) IndexError when polling ready workers and a gunicorn worker becomes a zombie
[ https://issues.apache.org/jira/browse/AIRFLOW-1885?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1885. - Resolution: Fixed Fix Version/s: 1.9.1 Issue resolved by pull request #2844 [https://github.com/apache/incubator-airflow/pull/2844] > IndexError when polling ready workers and a gunicorn worker becomes a zombie > > > Key: AIRFLOW-1885 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1885 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8 >Reporter: John Barker >Assignee: John Barker > Fix For: 1.9.1 > > > If one of the gunicorn workers happens to become a zombie between > {{children()}} and {{cmdline()}} calls to psutil in > {{get_num_ready_workers_running}} will raise an IndexError: > {code} > Traceback (most recent call last): > File "/usr/local/bin/airflow", line 28, in > args.func(args) > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 803, > in webserver > restart_workers(gunicorn_master_proc, num_workers) > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 687, > in restart_workers > num_ready_workers_running = > get_num_ready_workers_running(gunicorn_master_proc) > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 663, > in get_num_ready_workers_running > proc for proc in workers > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 664, > in > if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0] > IndexError: list index out of range > {code} > In version 4.2 of psutil, {{cmdline}} can return an empty array if the > process is zombied: > https://github.com/giampaolo/psutil/blob/release-4.2.0/psutil/_pslinux.py#L1007 > so one must ensure that an array is returned with at least one item from > {{cmdline}} before doing the {{in}} check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1885) IndexError when polling ready workers and a gunicorn worker becomes a zombie
[ https://issues.apache.org/jira/browse/AIRFLOW-1885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287492#comment-16287492 ] ASF subversion and git services commented on AIRFLOW-1885: -- Commit be54f0485eb0ec52b3147bea057b399565601e10 in incubator-airflow's branch refs/heads/master from [~johnbarker] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=be54f04 ] [AIRFLOW-1885] Fix IndexError in ready_prefix_on_cmdline If while trying to obtain a list of ready gunicorn workers, one of them becomes a zombie, psutil.cmdline returns [] (see here: https://github.com/giampaolo/psutil/blob/release-4 .2.0/psutil/_pslinux.py#L1007) Boom: Traceback (most recent call last): File "/usr/local/bin/airflow", line 28, in args.func(args) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 803, in webserver restart_workers(gunicorn_master_proc, num_workers) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 687, in restart_workers num_ready_workers_running = get_num_ready_workers_ running(gunicorn_master_proc) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 663, in get_num_ready_workers_running proc for proc in workers File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 664, in if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0] IndexError: list index out of range So ensure a cmdline is actually returned before doing the cmdline prefix check in ready_prefix_on_cmdline. Also: * Treat psutil.NoSuchProcess error as non ready worker * Add in tests for get_num_ready_workers_running Closes #2844 from j16r/bugfix/poll_zombie_process > IndexError when polling ready workers and a gunicorn worker becomes a zombie > > > Key: AIRFLOW-1885 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1885 > Project: Apache Airflow > Issue Type: Bug >Affects Versions: Airflow 1.8 >Reporter: John Barker >Assignee: John Barker > Fix For: 1.9.1 > > > If one of the gunicorn workers happens to become a zombie between > {{children()}} and {{cmdline()}} calls to psutil in > {{get_num_ready_workers_running}} will raise an IndexError: > {code} > Traceback (most recent call last): > File "/usr/local/bin/airflow", line 28, in > args.func(args) > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 803, > in webserver > restart_workers(gunicorn_master_proc, num_workers) > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 687, > in restart_workers > num_ready_workers_running = > get_num_ready_workers_running(gunicorn_master_proc) > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 663, > in get_num_ready_workers_running > proc for proc in workers > File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 664, > in > if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0] > IndexError: list index out of range > {code} > In version 4.2 of psutil, {{cmdline}} can return an empty array if the > process is zombied: > https://github.com/giampaolo/psutil/blob/release-4.2.0/psutil/_pslinux.py#L1007 > so one must ensure that an array is returned with at least one item from > {{cmdline}} before doing the {{in}} check. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1885] Fix IndexError in ready_prefix_on_cmdline
Repository: incubator-airflow Updated Branches: refs/heads/master 3e6babe8e -> be54f0485 [AIRFLOW-1885] Fix IndexError in ready_prefix_on_cmdline If while trying to obtain a list of ready gunicorn workers, one of them becomes a zombie, psutil.cmdline returns [] (see here: https://github.com/giampaolo/psutil/blob/release-4 .2.0/psutil/_pslinux.py#L1007) Boom: Traceback (most recent call last): File "/usr/local/bin/airflow", line 28, in args.func(args) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 803, in webserver restart_workers(gunicorn_master_proc, num_workers) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 687, in restart_workers num_ready_workers_running = get_num_ready_workers_ running(gunicorn_master_proc) File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 663, in get_num_ready_workers_running proc for proc in workers File "/usr/local/lib/python3.5/dist- packages/airflow/bin/cli.py", line 664, in if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0] IndexError: list index out of range So ensure a cmdline is actually returned before doing the cmdline prefix check in ready_prefix_on_cmdline. Also: * Treat psutil.NoSuchProcess error as non ready worker * Add in tests for get_num_ready_workers_running Closes #2844 from j16r/bugfix/poll_zombie_process Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/be54f048 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/be54f048 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/be54f048 Branch: refs/heads/master Commit: be54f0485eb0ec52b3147bea057b399565601e10 Parents: 3e6babe Author: John BarkerAuthored: Tue Dec 12 12:49:06 2017 +0100 Committer: Bolke de Bruin Committed: Tue Dec 12 12:49:06 2017 +0100 -- airflow/bin/cli.py| 24 --- tests/cli/__init__.py | 13 ++ tests/cli/test_cli.py | 59 ++ 3 files changed, 88 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/be54f048/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 1367362..812977b 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -553,6 +553,22 @@ def clear(args): include_subdags=not args.exclude_subdags) +def get_num_ready_workers_running(gunicorn_master_proc): +workers = psutil.Process(gunicorn_master_proc.pid).children() + +def ready_prefix_on_cmdline(proc): +try: +cmdline = proc.cmdline() +if len(cmdline) > 0: +return settings.GUNICORN_WORKER_READY_PREFIX in cmdline[0] +except psutil.NoSuchProcess: +pass +return False + +ready_workers = [proc for proc in workers if ready_prefix_on_cmdline(proc)] +return len(ready_workers) + + def restart_workers(gunicorn_master_proc, num_workers_expected): """ Runs forever, monitoring the child processes of @gunicorn_master_proc and @@ -590,14 +606,6 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): workers = psutil.Process(gunicorn_master_proc.pid).children() return len(workers) -def get_num_ready_workers_running(gunicorn_master_proc): -workers = psutil.Process(gunicorn_master_proc.pid).children() -ready_workers = [ -proc for proc in workers -if settings.GUNICORN_WORKER_READY_PREFIX in proc.cmdline()[0] -] -return len(ready_workers) - def start_refresh(gunicorn_master_proc): batch_size = conf.getint('webserver', 'worker_refresh_batch_size') log.debug('%s doing a refresh of %s workers', state, batch_size) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/be54f048/tests/cli/__init__.py -- diff --git a/tests/cli/__init__.py b/tests/cli/__init__.py new file mode 100644 index 000..9d7677a --- /dev/null +++ b/tests/cli/__init__.py @@ -0,0 +1,13 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language
[jira] [Commented] (AIRFLOW-1854) Improve Spark submit hook for cluster mode
[ https://issues.apache.org/jira/browse/AIRFLOW-1854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287490#comment-16287490 ] ASF subversion and git services commented on AIRFLOW-1854: -- Commit 3e6babe8ed8f8f281b67aa3f4e03bf3cfc1bcbaa in incubator-airflow's branch refs/heads/master from [~milanvdm] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=3e6babe ] [AIRFLOW-1854] Improve Spark Submit operator for standalone cluster mode Closes #2852 from milanvdmria/svend/submit2 > Improve Spark submit hook for cluster mode > -- > > Key: AIRFLOW-1854 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1854 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Reporter: Milan van der Meer >Assignee: Milan van der Meer >Priority: Minor > Labels: features > Fix For: 1.9.1 > > > *We are already working on this issue and will submit a PR soon* > When executing a Spark submit to a standalone cluster using the Spark submit > hook, it will get a return code from the Spark submit action and not the > Spark job itself. > This means when a Spark submit is executed and successfully received by the > cluster, the Airflow job will be successful, even when the Spark job fails on > the cluster later on. > Suggested solution: > * When you execute a Spark submit in cluster mode, the logs will contain a > driver ID. > * Use this driver ID to poll the cluster for the driver state. > * Based on the drivers state, the job will be successful or failed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AIRFLOW-1854) Improve Spark submit hook for cluster mode
[ https://issues.apache.org/jira/browse/AIRFLOW-1854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1854. - Resolution: Fixed Fix Version/s: 1.9.1 Issue resolved by pull request #2852 [https://github.com/apache/incubator-airflow/pull/2852] > Improve Spark submit hook for cluster mode > -- > > Key: AIRFLOW-1854 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1854 > Project: Apache Airflow > Issue Type: Improvement > Components: hooks >Reporter: Milan van der Meer >Assignee: Milan van der Meer >Priority: Minor > Labels: features > Fix For: 1.9.1 > > > *We are already working on this issue and will submit a PR soon* > When executing a Spark submit to a standalone cluster using the Spark submit > hook, it will get a return code from the Spark submit action and not the > Spark job itself. > This means when a Spark submit is executed and successfully received by the > cluster, the Airflow job will be successful, even when the Spark job fails on > the cluster later on. > Suggested solution: > * When you execute a Spark submit in cluster mode, the logs will contain a > driver ID. > * Use this driver ID to poll the cluster for the driver state. > * Based on the drivers state, the job will be successful or failed. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1854] Improve Spark Submit operator for standalone cluster mode
Repository: incubator-airflow Updated Branches: refs/heads/master 22453d037 -> 3e6babe8e [AIRFLOW-1854] Improve Spark Submit operator for standalone cluster mode Closes #2852 from milanvdmria/svend/submit2 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3e6babe8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3e6babe8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3e6babe8 Branch: refs/heads/master Commit: 3e6babe8ed8f8f281b67aa3f4e03bf3cfc1bcbaa Parents: 22453d0 Author: milanvdmriaAuthored: Tue Dec 12 12:45:41 2017 +0100 Committer: Bolke de Bruin Committed: Tue Dec 12 12:45:52 2017 +0100 -- airflow/contrib/hooks/spark_submit_hook.py | 217 --- .../contrib/operators/spark_submit_operator.py | 17 +- tests/contrib/hooks/test_spark_submit_hook.py | 175 --- .../operators/test_spark_submit_operator.py | 19 +- 4 files changed, 354 insertions(+), 74 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3e6babe8/airflow/contrib/hooks/spark_submit_hook.py -- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index c0bc84f..16e14b4 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -15,6 +15,7 @@ import os import subprocess import re +import time from airflow.hooks.base_hook import BaseHook from airflow.exceptions import AirflowException @@ -42,15 +43,20 @@ class SparkSubmitHook(BaseHook, LoggingMixin): :type jars: str :param java_class: the main class of the Java application :type java_class: str -:param packages: Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths +:param packages: Comma-separated list of maven coordinates of jars to include on the +driver and executor classpaths :type packages: str -:param exclude_packages: Comma-separated list of maven coordinates of jars to exclude while resolving the dependencies provided in 'packages' +:param exclude_packages: Comma-separated list of maven coordinates of jars to exclude +while resolving the dependencies provided in 'packages' :type exclude_packages: str -:param repositories: Comma-separated list of additional remote repositories to search for the maven coordinates given with 'packages' +:param repositories: Comma-separated list of additional remote repositories to search +for the maven coordinates given with 'packages' :type repositories: str -:param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) +:param total_executor_cores: (Standalone & Mesos only) Total cores for all executors +(Default: all the available cores on the worker) :type total_executor_cores: int -:param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) +:param executor_cores: (Standalone & YARN only) Number of cores per executor +(Default: 2) :type executor_cores: int :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :type executor_memory: str @@ -110,12 +116,25 @@ class SparkSubmitHook(BaseHook, LoggingMixin): self._num_executors = num_executors self._application_args = application_args self._verbose = verbose -self._sp = None +self._submit_sp = None self._yarn_application_id = None self._connection = self._resolve_connection() self._is_yarn = 'yarn' in self._connection['master'] +self._should_track_driver_status = self._resolve_should_track_driver_status() +self._driver_id = None +self._driver_status = None + +def _resolve_should_track_driver_status(self): +""" +Determines whether or not this hook should poll the spark driver status through +subsequent spark-submit status requests after the initial spark-submit request +:return: if the driver status should be tracked +""" +return ('spark://' in self._connection['master'] and +self._connection['deploy_mode'] == 'cluster') + def _resolve_connection(self): # Build from connection master or default to yarn if not available conn_data = {'master': 'yarn', @@ -149,21 +168,27 @@ class SparkSubmitHook(BaseHook, LoggingMixin): def get_conn(self): pass -def _build_command(self, application): -""" -Construct the spark-submit command to
[jira] [Commented] (AIRFLOW-1908) Celery broker transport options are not set correctly
[ https://issues.apache.org/jira/browse/AIRFLOW-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287486#comment-16287486 ] ASF subversion and git services commented on AIRFLOW-1908: -- Commit 22453d037ec69b3e5ab1cda4717a3dea9c47df56 in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=22453d0 ] [AIRFLOW-1908] Fix celery broker options config load Options were set to visibility timeout instead of broker_options directly. Furthermore, options should be int, float, bool or string not all string. Closes #2867 from bolkedebruin/AIRFLOW-1908 > Celery broker transport options are not set correctly > - > > Key: AIRFLOW-1908 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1908 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin > Fix For: 1.9.1 > > > broker_transport_options': {'visibility_timeout': broker_transport_options}, > furthermore some of the items need to be int -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1908) Celery broker transport options are not set correctly
[ https://issues.apache.org/jira/browse/AIRFLOW-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287485#comment-16287485 ] ASF subversion and git services commented on AIRFLOW-1908: -- Commit 22453d037ec69b3e5ab1cda4717a3dea9c47df56 in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=22453d0 ] [AIRFLOW-1908] Fix celery broker options config load Options were set to visibility timeout instead of broker_options directly. Furthermore, options should be int, float, bool or string not all string. Closes #2867 from bolkedebruin/AIRFLOW-1908 > Celery broker transport options are not set correctly > - > > Key: AIRFLOW-1908 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1908 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin > Fix For: 1.9.1 > > > broker_transport_options': {'visibility_timeout': broker_transport_options}, > furthermore some of the items need to be int -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AIRFLOW-1908) Celery broker transport options are not set correctly
[ https://issues.apache.org/jira/browse/AIRFLOW-1908?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-1908. - Resolution: Fixed Fix Version/s: 1.9.1 Issue resolved by pull request #2867 [https://github.com/apache/incubator-airflow/pull/2867] > Celery broker transport options are not set correctly > - > > Key: AIRFLOW-1908 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1908 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin > Fix For: 1.9.1 > > > broker_transport_options': {'visibility_timeout': broker_transport_options}, > furthermore some of the items need to be int -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1908] Fix celery broker options config load
Repository: incubator-airflow Updated Branches: refs/heads/master 683a27f2c -> 22453d037 [AIRFLOW-1908] Fix celery broker options config load Options were set to visibility timeout instead of broker_options directly. Furthermore, options should be int, float, bool or string not all string. Closes #2867 from bolkedebruin/AIRFLOW-1908 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/22453d03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/22453d03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/22453d03 Branch: refs/heads/master Commit: 22453d037ec69b3e5ab1cda4717a3dea9c47df56 Parents: 683a27f Author: Bolke de BruinAuthored: Tue Dec 12 12:44:06 2017 +0100 Committer: Bolke de Bruin Committed: Tue Dec 12 12:44:06 2017 +0100 -- airflow/config_templates/default_celery.py | 2 +- airflow/configuration.py | 23 ++- scripts/ci/airflow_travis.cfg | 6 ++ tests/configuration.py | 14 +- 4 files changed, 42 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/config_templates/default_celery.py -- diff --git a/airflow/config_templates/default_celery.py b/airflow/config_templates/default_celery.py index 3309cbe..57b9611 100644 --- a/airflow/config_templates/default_celery.py +++ b/airflow/config_templates/default_celery.py @@ -32,7 +32,7 @@ DEFAULT_CELERY_CONFIG = { 'task_default_queue': configuration.get('celery', 'DEFAULT_QUEUE'), 'task_default_exchange': configuration.get('celery', 'DEFAULT_QUEUE'), 'broker_url': configuration.get('celery', 'BROKER_URL'), -'broker_transport_options': {'visibility_timeout': broker_transport_options}, +'broker_transport_options': broker_transport_options, 'result_backend': configuration.get('celery', 'RESULT_BACKEND'), 'worker_concurrency': configuration.getint('celery', 'WORKER_CONCURRENCY'), } http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/airflow/configuration.py -- diff --git a/airflow/configuration.py b/airflow/configuration.py index ed63952..2bb2a49 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -28,6 +28,8 @@ import sys from future import standard_library +from six import iteritems + from airflow.utils.log.logging_mixin import LoggingMixin standard_library.install_aliases() @@ -237,8 +239,27 @@ class AirflowConfigParser(ConfigParser): self._validate() def getsection(self, section): +""" +Returns the section as a dict. Values are converted to int, float, bool +as required. +:param section: section from the config +:return: dict +""" if section in self._sections: -return self._sections[section] +_section = self._sections[section] +for key, val in iteritems(self._sections[section]): +try: +val = int(val) +except ValueError: +try: +val = float(val) +except ValueError: +if val.lower() in ('t', 'true'): +val = True +elif val.lower() in ('f', 'false'): +val = False +_section[key] = val +return _section return None http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/scripts/ci/airflow_travis.cfg -- diff --git a/scripts/ci/airflow_travis.cfg b/scripts/ci/airflow_travis.cfg index b71947e..ee29148 100644 --- a/scripts/ci/airflow_travis.cfg +++ b/scripts/ci/airflow_travis.cfg @@ -50,6 +50,12 @@ result_backend = db+mysql://root@localhost/airflow flower_port = default_queue = default +[celery_broker_transport_options] +visibility_timeout = 21600 +_test_only_bool = True +_test_only_float = 12.0 +_test_only_string = this is a test + [scheduler] job_heartbeat_sec = 1 scheduler_heartbeat_sec = 5 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/22453d03/tests/configuration.py -- diff --git a/tests/configuration.py b/tests/configuration.py index bb0fd17..300205b 100644 --- a/tests/configuration.py +++ b/tests/configuration.py @@ -13,12 +13,14 @@ # limitations under the License. from __future__ import print_function -import os import unittest +import six + from
[jira] [Created] (AIRFLOW-1912) The log facility airflow.processor should not propagate
Bolke de Bruin created AIRFLOW-1912: --- Summary: The log facility airflow.processor should not propagate Key: AIRFLOW-1912 URL: https://issues.apache.org/jira/browse/AIRFLOW-1912 Project: Apache Airflow Issue Type: Bug Components: scheduler Reporter: Bolke de Bruin Priority: Blocker Fix For: 1.9.1 The root logger will write to stdout. If redirection is used which is the case for processors and task runs (not runners) this can end up in an endless loop in case propagation is True. airflow.processor should not propagate therefore. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Resolved] (AIRFLOW-1907) Allow setting the max_ingestion_time by the Druid operator
[ https://issues.apache.org/jira/browse/AIRFLOW-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-1907. --- Resolution: Fixed Fix Version/s: 1.9.1 Issue resolved by pull request #2866 [https://github.com/apache/incubator-airflow/pull/2866] > Allow setting the max_ingestion_time by the Druid operator > -- > > Key: AIRFLOW-1907 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1907 > Project: Apache Airflow > Issue Type: Bug >Reporter: Fokko Driesprong > Fix For: 1.9.1 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1907) Allow setting the max_ingestion_time by the Druid operator
[ https://issues.apache.org/jira/browse/AIRFLOW-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287346#comment-16287346 ] ASF subversion and git services commented on AIRFLOW-1907: -- Commit 683a27f2c16e036b42226cb9d96012d0616d0aa0 in incubator-airflow's branch refs/heads/master from [~Fokko] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=683a27f ] [AIRFLOW-1907] Pass max_ingestion_time to Druid hook >From the Druid operator we want to pass the max_ingestion_time to the hook since some jobs might take considerably more time than the others By default we dont want to set a max ingestion time. Closes #2866 from Fokko/AIRFLOW-1907-pass-max- ingestion-time > Allow setting the max_ingestion_time by the Druid operator > -- > > Key: AIRFLOW-1907 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1907 > Project: Apache Airflow > Issue Type: Bug >Reporter: Fokko Driesprong > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1907] Pass max_ingestion_time to Druid hook
Repository: incubator-airflow Updated Branches: refs/heads/master c70d8f59c -> 683a27f2c [AIRFLOW-1907] Pass max_ingestion_time to Druid hook >From the Druid operator we want to pass the max_ingestion_time to the hook since some jobs might take considerably more time than the others By default we dont want to set a max ingestion time. Closes #2866 from Fokko/AIRFLOW-1907-pass-max- ingestion-time Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/683a27f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/683a27f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/683a27f2 Branch: refs/heads/master Commit: 683a27f2c16e036b42226cb9d96012d0616d0aa0 Parents: c70d8f5 Author: Fokko DriesprongAuthored: Tue Dec 12 10:40:46 2017 +0100 Committer: Fokko Driesprong Committed: Tue Dec 12 10:40:46 2017 +0100 -- airflow/contrib/operators/druid_operator.py | 8 ++-- airflow/hooks/druid_hook.py | 4 ++-- 2 files changed, 8 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/contrib/operators/druid_operator.py -- diff --git a/airflow/contrib/operators/druid_operator.py b/airflow/contrib/operators/druid_operator.py index 6978cc3..965dc50 100644 --- a/airflow/contrib/operators/druid_operator.py +++ b/airflow/contrib/operators/druid_operator.py @@ -34,10 +34,11 @@ class DruidOperator(BaseOperator): self, json_index_file, druid_ingest_conn_id='druid_ingest_default', +max_ingestion_time=None, *args, **kwargs): - super(DruidOperator, self).__init__(*args, **kwargs) self.conn_id = druid_ingest_conn_id +self.max_ingestion_time = max_ingestion_time with open(json_index_file) as data_file: index_spec = json.load(data_file) @@ -49,6 +50,9 @@ class DruidOperator(BaseOperator): ) def execute(self, context): -hook = DruidHook(druid_ingest_conn_id=self.conn_id) +hook = DruidHook( +druid_ingest_conn_id=self.conn_id, +max_ingestion_time=self.max_ingestion_time +) self.log.info("Sumitting %s", self.index_spec_str) hook.submit_indexing_job(self.index_spec_str) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/683a27f2/airflow/hooks/druid_hook.py -- diff --git a/airflow/hooks/druid_hook.py b/airflow/hooks/druid_hook.py index 655f666..9ce1f9a 100644 --- a/airflow/hooks/druid_hook.py +++ b/airflow/hooks/druid_hook.py @@ -36,7 +36,7 @@ class DruidHook(BaseHook): self, druid_ingest_conn_id='druid_ingest_default', timeout=1, -max_ingestion_time=18000): +max_ingestion_time=None): self.druid_ingest_conn_id = druid_ingest_conn_id self.timeout = timeout @@ -72,7 +72,7 @@ class DruidHook(BaseHook): sec = sec + 1 -if sec > self.max_ingestion_time: +if self.max_ingestion_time and sec > self.max_ingestion_time: # ensure that the job gets killed if the max ingestion time is exceeded requests.post("{0}/{1}/shutdown".format(url, druid_task_id)) raise AirflowException('Druid ingestion took more than %s seconds', self.max_ingestion_time)
[jira] [Commented] (AIRFLOW-1907) Allow setting the max_ingestion_time by the Druid operator
[ https://issues.apache.org/jira/browse/AIRFLOW-1907?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16287345#comment-16287345 ] ASF subversion and git services commented on AIRFLOW-1907: -- Commit 683a27f2c16e036b42226cb9d96012d0616d0aa0 in incubator-airflow's branch refs/heads/master from [~Fokko] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=683a27f ] [AIRFLOW-1907] Pass max_ingestion_time to Druid hook >From the Druid operator we want to pass the max_ingestion_time to the hook since some jobs might take considerably more time than the others By default we dont want to set a max ingestion time. Closes #2866 from Fokko/AIRFLOW-1907-pass-max- ingestion-time > Allow setting the max_ingestion_time by the Druid operator > -- > > Key: AIRFLOW-1907 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1907 > Project: Apache Airflow > Issue Type: Bug >Reporter: Fokko Driesprong > -- This message was sent by Atlassian JIRA (v6.4.14#64029)