[jira] [Commented] (AIRFLOW-2156) Parallelize Celery Executor
[ https://issues.apache.org/jira/browse/AIRFLOW-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610859#comment-16610859 ] ASF GitHub Bot commented on AIRFLOW-2156: - feng-tao closed pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor task state fetching URL: https://github.com/apache/incubator-airflow/pull/3830 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/UPDATING.md b/UPDATING.md index 78b8327f05..0405309d62 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -17,6 +17,11 @@ so you might need to update your config. The scheduler.min_file_parsing_loop_time config option has been temporarily removed due to some bugs. +### new `sync_parallelism` config option in celery section + +The new `sync_parallelism` config option will control how many processes CeleryExecutor will use to +fetch celery task state in parallel. Default value is max(1, number of cores - 1) + ## Airflow 1.10 Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` in your environment or diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index 18c486cb1e..000dd67a13 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -380,6 +380,10 @@ flower_port = # Default queue that tasks get assigned to and that worker listen on. default_queue = default +# How many processes CeleryExecutor uses to sync task state. +# 0 means to use max(1, number of cores - 1) processes. +sync_parallelism = 0 + # Import path for celery configuration options celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG diff --git a/airflow/config_templates/default_test.cfg b/airflow/config_templates/default_test.cfg index 06937452b0..f9279cce54 100644 --- a/airflow/config_templates/default_test.cfg +++ b/airflow/config_templates/default_test.cfg @@ -97,6 +97,7 @@ result_backend = db+mysql://airflow:airflow@localhost:3306/airflow flower_host = 0.0.0.0 flower_port = default_queue = default +sync_parallelism = 0 [mesos] master = localhost:5050 diff --git a/airflow/executors/celery_executor.py b/airflow/executors/celery_executor.py index 61bbc66716..0de48b4d39 100644 --- a/airflow/executors/celery_executor.py +++ b/airflow/executors/celery_executor.py @@ -17,20 +17,26 @@ # specific language governing permissions and limitations # under the License. +import math +import os import subprocess import time -import os +import traceback +from multiprocessing import Pool, cpu_count from celery import Celery from celery import states as celery_states +from airflow import configuration from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG from airflow.exceptions import AirflowException from airflow.executors.base_executor import BaseExecutor -from airflow import configuration from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.module_loading import import_string +# Make it constant for unit test. +CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task state' + ''' To start the celery worker, run the command: airflow worker @@ -63,6 +69,42 @@ def execute_command(command): raise AirflowException('Celery command failed') +class ExceptionWithTraceback(object): +""" +Wrapper class used to propogate exceptions to parent processes from subprocesses. +:param exception: The exception to wrap +:type exception: Exception +:param traceback: The stacktrace to wrap +:type traceback: str +""" + +def __init__(self, exception, exception_traceback): +self.exception = exception +self.traceback = exception_traceback + + +def fetch_celery_task_state(celery_task): +""" +Fetch and return the state of the given celery task. The scope of this function is +global so that it can be called by subprocesses in the pool. +:param celery_task: a tuple of the Celery task key and the async Celery object used +to fetch the task's state +:type celery_task: (str, celery.result.AsyncResult) +:return: a tuple of the Celery task key and the Celery state of the task +:rtype: (str, str) +""" + +try: +# Accessing state property of celery task will make actual network request +# to get the current state of the task. +res = (celery_task[0], celery_task[1].state) +except Exception as e: +exception_traceback = "Celery Task ID: {}\n{}".format(celery_task[0], + traceback.format_exc()) +res = ExceptionWithTraceback(e,
[jira] [Commented] (AIRFLOW-2156) Parallelize Celery Executor
[ https://issues.apache.org/jira/browse/AIRFLOW-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599435#comment-16599435 ] ASF GitHub Bot commented on AIRFLOW-2156: - yrqls21 opened a new pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor URL: https://github.com/apache/incubator-airflow/pull/3830 ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW-2156) issues and references them in the PR title. - https://issues.apache.org/jira/browse/AIRFLOW-2156 ### Description This change is mostly authored by @aoen. I am merely doing the tests/small fix and PR publishing due to job change of him. The change would bring Airflow to meet 5 min scheduling SLA with 30k running tasks according to Airbnb production traffic and stress tests. The performance of the celery querying step is hugely improved by 15x+ with 16 processors and can potentially be fast with more processors. - [x] Here are some details about my PR, including screenshots of any UI changes: ![screen shot 2018-06-13 at 12 01 17 am copy](https://user-images.githubusercontent.com/7818710/44940493-2bdbb300-ad44-11e8-943c-c3d2d3c5907b.png) https://user-images.githubusercontent.com/7818710/44940495-36964800-ad44-11e8-9a4a-3ff7790d14a9.png;> Notes: Syncing no longer happens at the end of the celery executor executions (e.g. if scheduler shuts down). The sync does not actually guarantee that tasks finished anyways and prolongs the ending protocol. There is no timeout on the subprocesses but that wasn't the case before this change either What about logging for the multiprocessing tasks? Well it's ok to skip them, they aren't currently logged either. ### Tests - [x] My PR adds the following unit tests: tests/executors/test_celery_executor.py:CeleryExecutorTest.test_exception_propagation ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [ ] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Parallelize Celery Executor > --- > > Key: AIRFLOW-2156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2156 > Project: Apache Airflow > Issue Type: Improvement > Components: celery >Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > > The CeleryExecutor doesn't currently support parallel execution to check task > states since Celery does not support this. This can greatly slow down the > Scheduler loops since each request to check a task's state is a network > request. > > The Celery Executor should parallelize these requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2156) Parallelize Celery Executor
[ https://issues.apache.org/jira/browse/AIRFLOW-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431308#comment-16431308 ] John Arnold commented on AIRFLOW-2156: -- This seems like a pretty easy candidate for multiprocessing. > Parallelize Celery Executor > --- > > Key: AIRFLOW-2156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2156 > Project: Apache Airflow > Issue Type: Improvement > Components: celery >Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > > The CeleryExecutor doesn't currently support parallel execution to check task > states since Celery does not support this. This can greatly slow down the > Scheduler loops since each request to check a task's state is a network > request. > > The Celery Executor should parallelize these requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)