Joseph Harris created AIRFLOW-1510:
--------------------------------------

             Summary: Scheduler: priority_weight sorting not applied across 
multiple DAGs
                 Key: AIRFLOW-1510
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-1510
             Project: Apache Airflow
          Issue Type: Bug
          Components: scheduler
    Affects Versions: 1.8.1
         Environment: Ubuntu, CeleryRunner
            Reporter: Joseph Harris


h3. Issue
When there are multiple available tasks across many DAGs, the order in which 
those tasks are queued does not respect the priority_weighting order across all 
available tasks. The run order is instead dependent on how quickly the 
Scheduler loop reaches the DAG, when the number of DAGs is greater than the 
scheduler 'max_threads' variable.

This is particularly problematic when there are long-running tasks competing 
over a limited slots in a pool.

With over 80 DAGs in operations, increasing max_threads to this number doesn't 
seem like a practical solution.


h3. What should be done
* The docs should be updated to be less misleading about how priority_weight is 
likely to behave: https://airflow.incubator.apache.org/concepts.html#pools
* Potential implementation improvements on the scheduler: force the 
ProcessorManager to wait for all jobs to be processed (slow but reliable) - or 
make _execute_task_instances() look at tasks from all DAGs (faster but less 
reliable).

h3. Example
For instance, with 4 tasks:
|| DAG ||Task||Priority||Pool||
|| A || 1 |20|pool|
|| B || 2 |1|pool|
|| C || 3 |1|pool|
|| D || 4 |100|pool|

The scheduler would look at DAGs A & B first, and send the tasks in order (1, 
2). Then the scheduler would look at 3 & 4 and send these in order (4, 3) if 
there are enough pool slots available.


h3. Current Implementation Detail
The SchedulerJob code is a bit complex, but the sequence of events in the 
scheduler loop looks like this:
* The 
[DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298]
 loops across all discovered DAG files, and launches Processor threads (limited 
by max_threads). Each processor reads in a single DAG, and checks whether any 
tasks in the DAG have the dependencies met. If dependencies are met, the task 
is set to state='scheduled'.
* 
[DagFileProcessorManager.heartbeat()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1409]
 returns a list of DAGs returned by the Processor threads during its last 
cycle. When max_threads = 2, this list will contain a maximum of 2 DAGs.
* These DAGs are [passed to 
SchedulerJob._execute_task_instances()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1440]
* An [ORM 
query|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L992]
 selects tasks where state='scheduled' and the task is in the DAGs returned by 
the last heartbeat() loop. *Only those tasks are [sorted and passed to the 
Celery 
queue|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1035]*



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to