Joseph Harris created AIRFLOW-1510: -------------------------------------- Summary: Scheduler: priority_weight sorting not applied across multiple DAGs Key: AIRFLOW-1510 URL: https://issues.apache.org/jira/browse/AIRFLOW-1510 Project: Apache Airflow Issue Type: Bug Components: scheduler Affects Versions: 1.8.1 Environment: Ubuntu, CeleryRunner Reporter: Joseph Harris
h3. Issue When there are multiple available tasks across many DAGs, the order in which those tasks are queued does not respect the priority_weighting order across all available tasks. The run order is instead dependent on how quickly the Scheduler loop reaches the DAG, when the number of DAGs is greater than the scheduler 'max_threads' variable. This is particularly problematic when there are long-running tasks competing over a limited slots in a pool. With over 80 DAGs in operations, increasing max_threads to this number doesn't seem like a practical solution. h3. What should be done * The docs should be updated to be less misleading about how priority_weight is likely to behave: https://airflow.incubator.apache.org/concepts.html#pools * Potential implementation improvements on the scheduler: force the ProcessorManager to wait for all jobs to be processed (slow but reliable) - or make _execute_task_instances() look at tasks from all DAGs (faster but less reliable). h3. Example For instance, with 4 tasks: || DAG ||Task||Priority||Pool|| || A || 1 |20|pool| || B || 2 |1|pool| || C || 3 |1|pool| || D || 4 |100|pool| The scheduler would look at DAGs A & B first, and send the tasks in order (1, 2). Then the scheduler would look at 3 & 4 and send these in order (4, 3) if there are enough pool slots available. h3. Current Implementation Detail The SchedulerJob code is a bit complex, but the sequence of events in the scheduler loop looks like this: * The [DagFileProcessorManager|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/utils/dag_processing.py#L298] loops across all discovered DAG files, and launches Processor threads (limited by max_threads). Each processor reads in a single DAG, and checks whether any tasks in the DAG have the dependencies met. If dependencies are met, the task is set to state='scheduled'. * [DagFileProcessorManager.heartbeat()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1409] returns a list of DAGs returned by the Processor threads during its last cycle. When max_threads = 2, this list will contain a maximum of 2 DAGs. * These DAGs are [passed to SchedulerJob._execute_task_instances()|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1440] * An [ORM query|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L992] selects tasks where state='scheduled' and the task is in the DAGs returned by the last heartbeat() loop. *Only those tasks are [sorted and passed to the Celery queue|https://github.com/apache/incubator-airflow/blob/1.8.1/airflow/jobs.py#L1035]* -- This message was sent by Atlassian JIRA (v6.4.14#64029)