[jira] [Commented] (AIRFLOW-2156) Parallelize Celery Executor

2018-09-11 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610859#comment-16610859
 ] 

ASF GitHub Bot commented on AIRFLOW-2156:
-

feng-tao closed pull request #3830: [AIRFLOW-2156] Parallelize Celery Executor 
task state fetching
URL: https://github.com/apache/incubator-airflow/pull/3830
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/UPDATING.md b/UPDATING.md
index 78b8327f05..0405309d62 100644
--- a/UPDATING.md
+++ b/UPDATING.md
@@ -17,6 +17,11 @@ so you might need to update your config.
 The scheduler.min_file_parsing_loop_time config option has been temporarily 
removed due to
 some bugs.
 
+### new `sync_parallelism` config option in celery section
+
+The new `sync_parallelism` config option will control how many processes 
CeleryExecutor will use to
+fetch celery task state in parallel. Default value is max(1, number of cores - 
1)
+
 ## Airflow 1.10
 
 Installation and upgrading requires setting `SLUGIFY_USES_TEXT_UNIDECODE=yes` 
in your environment or
diff --git a/airflow/config_templates/default_airflow.cfg 
b/airflow/config_templates/default_airflow.cfg
index 18c486cb1e..000dd67a13 100644
--- a/airflow/config_templates/default_airflow.cfg
+++ b/airflow/config_templates/default_airflow.cfg
@@ -380,6 +380,10 @@ flower_port = 
 # Default queue that tasks get assigned to and that worker listen on.
 default_queue = default
 
+# How many processes CeleryExecutor uses to sync task state.
+# 0 means to use max(1, number of cores - 1) processes.
+sync_parallelism = 0
+
 # Import path for celery configuration options
 celery_config_options = 
airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
 
diff --git a/airflow/config_templates/default_test.cfg 
b/airflow/config_templates/default_test.cfg
index 06937452b0..f9279cce54 100644
--- a/airflow/config_templates/default_test.cfg
+++ b/airflow/config_templates/default_test.cfg
@@ -97,6 +97,7 @@ result_backend = 
db+mysql://airflow:airflow@localhost:3306/airflow
 flower_host = 0.0.0.0
 flower_port = 
 default_queue = default
+sync_parallelism = 0
 
 [mesos]
 master = localhost:5050
diff --git a/airflow/executors/celery_executor.py 
b/airflow/executors/celery_executor.py
index 61bbc66716..0de48b4d39 100644
--- a/airflow/executors/celery_executor.py
+++ b/airflow/executors/celery_executor.py
@@ -17,20 +17,26 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import math
+import os
 import subprocess
 import time
-import os
+import traceback
+from multiprocessing import Pool, cpu_count
 
 from celery import Celery
 from celery import states as celery_states
 
+from airflow import configuration
 from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
 from airflow.exceptions import AirflowException
 from airflow.executors.base_executor import BaseExecutor
-from airflow import configuration
 from airflow.utils.log.logging_mixin import LoggingMixin
 from airflow.utils.module_loading import import_string
 
+# Make it constant for unit test.
+CELERY_FETCH_ERR_MSG_HEADER = 'Error fetching Celery task state'
+
 '''
 To start the celery worker, run the command:
 airflow worker
@@ -63,6 +69,42 @@ def execute_command(command):
 raise AirflowException('Celery command failed')
 
 
+class ExceptionWithTraceback(object):
+"""
+Wrapper class used to propogate exceptions to parent processes from 
subprocesses.
+:param exception: The exception to wrap
+:type exception: Exception
+:param traceback: The stacktrace to wrap
+:type traceback: str
+"""
+
+def __init__(self, exception, exception_traceback):
+self.exception = exception
+self.traceback = exception_traceback
+
+
+def fetch_celery_task_state(celery_task):
+"""
+Fetch and return the state of the given celery task. The scope of this 
function is
+global so that it can be called by subprocesses in the pool.
+:param celery_task: a tuple of the Celery task key and the async Celery 
object used
+to fetch the task's state
+:type celery_task: (str, celery.result.AsyncResult)
+:return: a tuple of the Celery task key and the Celery state of the task
+:rtype: (str, str)
+"""
+
+try:
+# Accessing state property of celery task will make actual network 
request
+# to get the current state of the task.
+res = (celery_task[0], celery_task[1].state)
+except Exception as e:
+exception_traceback = "Celery Task ID: {}\n{}".format(celery_task[0],
+  
traceback.format_exc())
+res = ExceptionWithTraceback(e, 

[jira] [Commented] (AIRFLOW-2156) Parallelize Celery Executor

2018-08-31 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/AIRFLOW-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16599435#comment-16599435
 ] 

ASF GitHub Bot commented on AIRFLOW-2156:
-

yrqls21 opened a new pull request #3830: [AIRFLOW-2156] Parallelize Celery 
Executor
URL: https://github.com/apache/incubator-airflow/pull/3830
 
 
   ### Jira
   
   - [x] My PR addresses the following [Airflow 
Jira](https://issues.apache.org/jira/browse/AIRFLOW-2156) issues and references 
them in the PR title.
 - https://issues.apache.org/jira/browse/AIRFLOW-2156
   
   ### Description
   This change is mostly authored by @aoen. I am merely doing the tests/small 
fix and PR publishing due to job change of him.
   
   The change would bring Airflow to meet 5 min scheduling SLA with 30k running 
tasks according to Airbnb production traffic and stress tests. The performance 
of the celery querying step is hugely improved by 15x+ with 16 processors and 
can potentially be fast with more processors.
   
   - [x] Here are some details about my PR, including screenshots of any UI 
changes:
   ![screen shot 2018-06-13 at 12 01 17 am 
copy](https://user-images.githubusercontent.com/7818710/44940493-2bdbb300-ad44-11e8-943c-c3d2d3c5907b.png)
   
   https://user-images.githubusercontent.com/7818710/44940495-36964800-ad44-11e8-9a4a-3ff7790d14a9.png;>
   
   Notes:
   
   Syncing no longer happens at the end of the celery executor executions (e.g. 
if scheduler shuts down). The sync does not actually guarantee that tasks 
finished anyways and prolongs the ending protocol.
   There is no timeout on the subprocesses but that wasn't the case before this 
change either
   What about logging for the multiprocessing tasks? Well it's ok to skip them, 
they aren't currently logged either.
   
   ### Tests
   
   - [x] My PR adds the following unit tests:
   
tests/executors/test_celery_executor.py:CeleryExecutorTest.test_exception_propagation
   
   ### Commits
   
   - [x] My commits all reference Jira issues in their subject lines, and I 
have squashed multiple commits if they address the same issue. In addition, my 
commits follow the guidelines from "[How to write a good git commit 
message](http://chris.beams.io/posts/git-commit/)":
 1. Subject is separated from body by a blank line
 1. Subject is limited to 50 characters (not including Jira issue reference)
 1. Subject does not end with a period
 1. Subject uses the imperative mood ("add", not "adding")
 1. Body wraps at 72 characters
 1. Body explains "what" and "why", not "how"
   
   ### Documentation
   
   - [ ] In case of new functionality, my PR adds documentation that describes 
how to use it.
 - When adding new operators/hooks/sensors, the autoclass documentation 
generation needs to be added.
   
   ### Code Quality
   
   - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff`
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Parallelize Celery Executor
> ---
>
> Key: AIRFLOW-2156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: celery
>Reporter: Dan Davydov
>Assignee: Dan Davydov
>Priority: Major
>
> The CeleryExecutor doesn't currently support parallel execution to check task 
> states since Celery does not support this. This can greatly slow down the 
> Scheduler loops since each request to check a task's state is a network 
> request.
>  
> The Celery Executor should parallelize these requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (AIRFLOW-2156) Parallelize Celery Executor

2018-04-09 Thread John Arnold (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16431308#comment-16431308
 ] 

John Arnold commented on AIRFLOW-2156:
--

This seems like a pretty easy candidate for multiprocessing.

> Parallelize Celery Executor
> ---
>
> Key: AIRFLOW-2156
> URL: https://issues.apache.org/jira/browse/AIRFLOW-2156
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: celery
>Reporter: Dan Davydov
>Assignee: Dan Davydov
>Priority: Major
>
> The CeleryExecutor doesn't currently support parallel execution to check task 
> states since Celery does not support this. This can greatly slow down the 
> Scheduler loops since each request to check a task's state is a network 
> request.
>  
> The Celery Executor should parallelize these requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)