gdevanla opened a new issue #8691:
URL: https://github.com/apache/airflow/issues/8691


   **Apache Airflow version**:
   1.10.3 
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   
   **Environment**:
   Python 3.7.4
   
   - **Cloud provider or hardware configuration**:
   - **OS** (e.g. from /etc/os-release):
   Ubuntu xenial/bionic
   - **Kernel** (e.g. `uname -a`): 
   Linux 4.15.0-45-generic #48~16.04.1-Ubuntu SMP Tue Jan 29 18:03:48 UTC 2019 
x86_64 x86_64 x86_64 GNU/Linux
   
   - **Install tools**:
   - **Others**:
   
   **What happened**:
   
   The task_instance gets stuck in `scheduled` state because of inconsistency 
in expectations of how queued_tasks (that have failed to be queued successfully 
in CeleryExecutor) are to be handled. 
   
   Given a TaskInstance, `TI`, whose state is `None`, the following psuedo code 
is executed inside the 'scheduler_loop'. In this process, the `TI`, in some 
situations gets stuck in `scheduled` state
   
   (The indentations below depicts the call-stack)
   ```
   Given, a task_instance `TI`, in `state == None`,
   
   execute_helper (scheduler loop)
       (first iteration of the scheduler loop)
        - calls `_execute_task_instances` 
           - calls `_find_executable_task_instances()` that returns `TI` that 
has state == `None`
           - calls  `_change_state_for_executable_task_instances` that updates 
`TI`s state = `queued`
           - calls `_enqueue_task_instances_with_queued_state`. This function 
adds `TI` to `Executor.queued_tasks` dictionary.
       - calls `CeleryExecutor.heartbeat`
           Tries to `send_task to worker`. If this succeeds, the `TI` is popped 
from `CeleryExecutor.queued_tasks`. But in our scenario, `CeleryExecutor`, just 
leaves the entry in`queued_tasks` intact beause either `Exception` was raised 
or `result` was `None`. The `CeleryExecutor` assumes the scheduler will handle 
this scenario. This is where the problem starts.(see second iteration below) 
(The link to this code is provided below)
       - calls `_change_state_for_tasks_failed_to_execute`.
            This function notices that the `TI` entry in 
`CeleryExecutor.queued_tasks`, and assumes something went wrong and therefore 
correctly updates status of `TI` back to `scheduled`. Note, that the entry of 
`TI` still is in the `queued_tasks` and that causes the current issue (see 
second iteration below)
   
       - other maintenance activities happen in the scheduler loop  (not 
relevant to this issue)
   
       (second iteration of the scheduler loop)
       - calls `_execute_task_instances`
           - calls `_find_executable_task_instances()`.
               Now, this function is supposed to return `TI` since it is in 
`scheduled` state. But, it finds that an entry for `TI` already exists in 
`CeleryExecutor.queued_tasks` and therefore does not return `TI` (refer to link 
provided below which point to this case).  This means `TI` will never  be 
`queued` and is stuck in `scheduled` state. 
(https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033)
   ```
   The only workaround for this currently, is to restart the scheduler. When 
the scheduler is restarted, the `CeleryExecutor.queued_tasks` is reset and 
therefore the `TI` instance is `queued` again.
   
   The code where `queue_tasks` entry is updated by poping the TI is here:
   
https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/executors/celery_executor.py#L223
   
   The code due to which `TI` gets stuck in `scheduled` state is here:
   
https://github.com/apache/airflow/blob/a943d6beab473b8ec87bb8c6e43f93cc64fa1d23/airflow/jobs/scheduler_job.py#L1033
   
   I think the code here should only check if `CeleryExecutor.running' 
dictionary has `TI` in its entries. But, I am not sure how it affects other 
schedulers.
   
   **What you expected to happen**:
   
   The `_find_executable_task_instances()` function, should only check if 
`CeleryExecutor.running` contains an entry for `TI` and return `TI` as part of 
its list of tasks to be queued.
   
   **How to reproduce it**:
   
   It can be reproduced by forcing the `result` value in 
`CeleryExecutor.heartbeat` to return an `ExceptionTraceback' object or `None`. 
   
   (Note: Links point to `master` branch. But, the problem applies to 1.10.3 
and higher versions)
   
   **Anything else we need to know**:
   
   I am not able to see a scenario where in `CeleryExecutor.heartbeart` the 
`result` is `None`.  Since, looking at the `Celery.app` module, it feels like 
the `result` can never be done. But, I suspect there are scenario's where the 
`result` is None and therefore the `CeleryExecutor` does not pop the `TI` from 
the queue. I am not able to prove this concretely.
   
   This also happens with later version's of Airflow. In the later version's of 
airflow, the `CeleryExecutor.trigger_dags' functions is performing the same set 
of operations. The code has been moved around between different versions but 
the logic remains the same and the problem exists in later versions as well.
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to