Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1847312911 ## airflow/executors/base_executor.py: ## @@ -552,7 +554,12 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +@deprecated( Review Comment: sounds good -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jscheffl commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1847172248 ## airflow/executors/base_executor.py: ## @@ -552,7 +554,12 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +@deprecated( Review Comment: @dstandish Triued the new cherry-picker tool and made a back-port. feel free to debate/block/approve there. I'd like to back-port, then the deprecation is made visible early. Additionally the missing feature (in my view: bug) is resolved in 2.10.4 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1846890357 ## airflow/executors/base_executor.py: ## @@ -552,7 +554,12 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +@deprecated( Review Comment: i am not sure what this will be backported to. I think 2.11 is the most "by the book" but others have suggested it could or should maybe go to 2.10 @potiuk @jedcunningham -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jscheffl commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844943356 ## airflow/executors/base_executor.py: ## @@ -552,7 +554,12 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +@deprecated( Review Comment: @dstandish As this is a change of (public) API of BaseExecutor, are you back-porting this to `v2-10-test` branch (the Interface change) to know that other executor implementations need to adjust? Otherwise, is this PR in general a bug-fix that would be valid to be ported to 2.10-line of Airflow or is this treated as new feature for Airflow 3? (I see this as 70% a bug fix for operational problem improvement and 30% a feature that myself I always found missing but never made it to the idea to contribute myself). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jscheffl commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844943356 ## airflow/executors/base_executor.py: ## @@ -552,7 +554,12 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +@deprecated( Review Comment: As this is a change of (public) API of BaseExecutor, are you back-porting this to `v2-10-test` branch (the Interface change) to know that other executor implementations need to adjust? Otherwise, is this PR in general a bug-fix that would be valid to be ported to 2.10-line of Airflow or is this treated as new feature for Airflow 3? (I see this as 70% a bug fix for operational problem improvement and 30% a feature that myself I always found missing but never made it to the idea to contribute myself). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish merged PR #43520: URL: https://github.com/apache/airflow/pull/43520 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844674828 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") @provide_session -def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: +def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ -Mark tasks stuck in queued for longer than `task_queued_timeout` as failed. + +Handle the scenario where a task is queued for longer than `task_queued_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks -should not be stuck in queued for a long time. This will mark tasks stuck in -queued for longer than `self._task_queued_timeout` as failed. If the task has -available retries, it will be retried. +should not be stuck in queued for a long time. + +Originally, we simply marked a task as failed when it was stuck in queued for +too long. We found that this led to suboptimal outcomes as ideally we would like "failed" +to mean that a task was unable to run, instead of it meaning that we were unable to run the task. + +As a compromise between always failing a stuck task and always rescheduling a stuck task (which could Review Comment: yea -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844673286 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") @provide_session -def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: +def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ -Mark tasks stuck in queued for longer than `task_queued_timeout` as failed. + +Handle the scenario where a task is queued for longer than `task_queued_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks -should not be stuck in queued for a long time. This will mark tasks stuck in -queued for longer than `self._task_queued_timeout` as failed. If the task has -available retries, it will be retried. +should not be stuck in queued for a long time. + +Originally, we simply marked a task as failed when it was stuck in queued for +too long. We found that this led to suboptimal outcomes as ideally we would like "failed" +to mean that a task was unable to run, instead of it meaning that we were unable to run the task. + +As a compromise between always failing a stuck task and always rescheduling a stuck task (which could +lead to tasks being stuck in queued forever without informing the user). """ -self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method") +tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session) +for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): +try: +for ti in stuck_tis: +executor.revoke_task(ti=ti) +self._maybe_requeue_stuck_ti( +ti=ti, +session=session, +) +except NotImplementedError: +# this block only gets entered if the executor has not implemented `revoke_task`. +# in which case, we try the fallback logic +# todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0. +# after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should +# just continue immediately. +self._stuck_in_queued_backcompat_logic(executor, stuck_tis) +continue -tasks_stuck_in_queued = session.scalars( +def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]: +"""Query db for TIs that are stuck in queued.""" +return session.scalars( select(TI).where( TI.state == TaskInstanceState.QUEUED, TI.queued_dttm < (timezone.utcnow() - timedelta(seconds=self._task_queued_timeout)), TI.queued_by_job_id == self.job.id, ) -).all() +) -for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): -try: -cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) -for ti in stuck_tis: -if repr(ti) in cleaned_up_task_instances: -self.log.warning( -"Marking task instance %s stuck in queued as failed. " -"If the task instance has available retries, it will be retried.", -ti, -) -session.add( -Log( -event="stuck in queued", -task_instance=ti.key, -extra=( -"Task will be marked as failed. If the task instance has " -"available retries, it will be retried." -), -) -) -except NotImplementedError: -self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") +def _maybe_requeue_stuck_ti(self, *, ti, session): +""" +Requeue task if it has not been attempted too many times. + +Otherwise, fail it. +""" +num_times_stuck = self._get_num_times_stuck_in_queued(ti, session) +if num_times_stuck < self._num_stuck_queued_retries: +self.log.info("Task stuck in queued; will try to requeue. task_id=%s", ti.task_id) +session.add( +Log( +event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jedcunningham commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844665285 ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis +@deprecated( +reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", Review Comment: Yeah. It doesn't really matter. Just feels a bit cleaner to me. We should at least state what version to upgrade to. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844671986 ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis +@deprecated( +reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", Review Comment: it would also only show up in scheduler logs, and only when stuck quueued tasks processed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844671484 ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis +@deprecated( +reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", Review Comment: we don't know what version this is gonna be released in, alas -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844658622 ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis +@deprecated( +reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", Review Comment: Why not deprecate? This warning would only appear if user upgrades celery but not airflow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jedcunningham commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844644649 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") @provide_session -def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: +def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ -Mark tasks stuck in queued for longer than `task_queued_timeout` as failed. + +Handle the scenario where a task is queued for longer than `task_queued_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks -should not be stuck in queued for a long time. This will mark tasks stuck in -queued for longer than `self._task_queued_timeout` as failed. If the task has -available retries, it will be retried. +should not be stuck in queued for a long time. + +Originally, we simply marked a task as failed when it was stuck in queued for +too long. We found that this led to suboptimal outcomes as ideally we would like "failed" +to mean that a task was unable to run, instead of it meaning that we were unable to run the task. + +As a compromise between always failing a stuck task and always rescheduling a stuck task (which could Review Comment: We don't way what the compromise is. Should reword this a bit. ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[Task return not_adopted_tis +@deprecated( +reason="Replaced by function `revoke_task`. Upgrade airflow core to make this go away.", Review Comment: hmm, I wonder if we should instead not deprecate this, but leave a comment that it can be removed when the min airflow version for providers is 2.11? ## airflow/jobs/scheduler_job_runner.py: ## @@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") @provide_session -def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: +def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ -Mark tasks stuck in queued for longer than `task_queued_timeout` as failed. + +Handle the scenario where a task is queued for longer than `task_queued_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks -should not be stuck in queued for a long time. This will mark tasks stuck in -queued for longer than `self._task_queued_timeout` as failed. If the task has -available retries, it will be retried. +should not be stuck in queued for a long time. + +Originally, we simply marked a task as failed when it was stuck in queued for +too long. We found that this led to suboptimal outcomes as ideally we would like "failed" +to mean that a task was unable to run, instead of it meaning that we were unable to run the task. + +As a compromise between always failing a stuck task and always rescheduling a stuck task (which could +lead to tasks being stuck in queued forever without informing the user). """ -self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued method") +tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session) +for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): +try: +for ti in stuck_tis: +executor.revoke_task(ti=ti) +self._maybe_requeue_stuck_ti( +ti=ti, +session=session, +) +except NotImplementedError: +# this block only gets entered if the executor has not implemented `revoke_task`. +# in which case, we try the fallback logic +# todo: remove the call to _stuck_in_queued_backcompat_logic in airflow 3.0. +# after 3.0, `cleanup_stuck_queued_tasks` will be removed, so we should +# just continue immediately. +self._stuck_in_queued_backcompat_logic(executor, stuck_tis) +continue -tasks_stuck_in_queued = session.scalars( +def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]: +"""Query db for TIs that are stuck in queu
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844406625 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: Alright. I have made this an undocumented param. Hopefully this pleases everyone. If a sophisticated user (like an airflow-as-a-service provider) wanted to customize this, they can. But for average user, it is invisible. And making it configurable-albeit-hidden is a bonus for test readability because we can control exactly how many time it should retry, and this is important to control for testing. Resolving convo. PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
eladkal commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1844191148 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: > so yeah, these are very much different things. do we need num_stuck_in_queued_retries to be configurable? not necessarily. That is where I was going. This sounds more like airflow internal mechanism. I think we better not expose this and possibly cause confusion how settings are interacting and what they are about -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
collinmcnulty commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1842556180 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: task queued timeout consumes the retries on the task as set by the dag author. This is a problem when the task is not idempotent and therefore deliberately has retries set to 0. But the particular failure of a task to even get to running should not actually consume the task's retries that are for the running of the task. So we still need the timeout setting, but we also need a separate counter for attempts to get from queued to running, whereas retries are attempts to get from running to success. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1842560376 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: so yeah, these are very much different things. do we need `num_stuck_in_queued_retries` to be configurable? not necessarily. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
eladkal commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: I am late to join the party but I don't follow why we need this. We have [AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout) which introduced in Airflow 2.6 Which means that task retries after timeout passed. If for some reason this setting isn't doing it now the we should fix it rather than adding a new setting. `task_queued_timeout` + `num_stuck_in_queued_retries` feels too complicated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
eladkal commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: I am late to join the party but I don't follow why we need this. We have [AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout) which introduced in Airflow 2.6 Which means that task retries after timeout passed. If for some reason this setting isn't doing it now the we should fix it rather than adding a new setting. task_queued_timeout + num_stuck_in_queued_retries feels too complicated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
eladkal commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: I am late to join the party but I don't follow why we need this. We have [AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout) introduced in Airflow 2.6 Which means that task retries after timeout passed. If for some reason this setting isn't doing it now the we should fix it rather than adding a new setting. `task_queued_timeout` + `num_stuck_in_queued_retries` feels too complicated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
eladkal commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: I am late to join the party but I don't follow why we need this. We have [AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout) which introduced in Airflow 2.6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
eladkal commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852 ## airflow/config_templates/config.yml: ## @@ -2331,6 +2331,14 @@ scheduler: type: integer example: ~ default: "8974" +num_stuck_in_queued_retries: Review Comment: I am late to join the party but I don't follow why we need this. We have [AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout) which introduced in Airflow 2.6 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1835163879 ## providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py: ## @@ -443,6 +446,23 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: if str(e.status) != "404": raise +def patch_pod_delete_stuck(self, *, pod_name: str, namespace: str): +"""Add a "done" annotation to ensure we don't continually adopt pods.""" Review Comment: fixed ## airflow/jobs/scheduler_job_runner.py: ## @@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ) ).all() +num_allowed_retries = conf.getint("scheduler", "num_stuck_in_queued_retries") for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): -try: -cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) -for ti in stuck_tis: -if repr(ti) in cleaned_up_task_instances: -self.log.warning( -"Marking task instance %s stuck in queued as failed. " -"If the task instance has available retries, it will be retried.", -ti, +tis: Iterable[TaskInstance] = [] +with suppress(NotImplementedError): +# BaseExecutor has "abstract" method `cleanup_stuck_queued_tasks` +# We are tolerant of implementers not implementing it. +tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis) +for ti in tis: +if not isinstance(ti, TaskInstance): +# todo: when can we remove this? +# this is for backcompat. the pre-2.10.4 version of the interface +# expected a string return val. +self.log.warning( +"Task instance %s stuck in queued. May be set to failed.", +ti, +) +continue + +self.log.warning("Task stuck in queued and may be requeued. task_id=%s", ti.key) + +num_times_stuck = self._get_num_times_stuck_in_queued(ti, session) +if num_times_stuck < num_allowed_retries: +session.add( +Log( +event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT, +task_instance=ti.key, +extra=( +f"Task was in queued state for longer than {self._task_queued_timeout} " +"seconds; task state will be set back to scheduled." +), ) -session.add( -Log( -event="stuck in queued", -task_instance=ti.key, -extra=( -"Task will be marked as failed. If the task instance has " -"available retries, it will be retried." -), -) +) +with suppress(KeyError): +executor.running.remove(ti.key) +self._reschedule_stuck_task(ti) +else: +self.log.warning( +"Task requeue attempts exceeded max; marking failed. task_instance=%s", ti +) +session.add( +Log( +event="stuck in queued tries exceeded", +task_instance=ti.key, +extra=( +f"Task was requeued more than {num_allowed_retries} times " +"and will be failed." +), ) -except NotImplementedError: -self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") +) +executor.fail(ti.key) + +@provide_session +def _reschedule_stuck_task(self, ti, session=NEW_SESSION): +session.execute( +update(TI) +.where(TI.filter_for_tis([ti])) +.values( +state=TaskInstanceState.SCHEDULED, +queued_dttm=None, +) +.execution_options(synchronize_session=False) +) + +@provide_session +def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: +""" +Check the Log table to see how many times a
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1835163761 ## airflow/executors/base_executor.py: ## @@ -540,7 +540,9 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +def cleanup_stuck_queued_tasks( +self, tis: list[TaskInstance] +) -> Iterable[TaskInstance]: # pragma: no cover Review Comment: outdated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1835163642 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ) ).all() +num_allowed_retries = conf.getint("scheduler", "num_stuck_in_queued_retries") for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): -try: -cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) -for ti in stuck_tis: -if repr(ti) in cleaned_up_task_instances: -self.log.warning( -"Marking task instance %s stuck in queued as failed. " -"If the task instance has available retries, it will be retried.", -ti, +tis: Iterable[TaskInstance] = [] +with suppress(NotImplementedError): +# BaseExecutor has "abstract" method `cleanup_stuck_queued_tasks` +# We are tolerant of implementers not implementing it. +tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis) +for ti in tis: +if not isinstance(ti, TaskInstance): +# todo: when can we remove this? +# this is for backcompat. the pre-2.10.4 version of the interface +# expected a string return val. +self.log.warning( +"Task instance %s stuck in queued. May be set to failed.", Review Comment: outdated; we added new func ## airflow/jobs/scheduler_job_runner.py: ## @@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ) ).all() +num_allowed_retries = conf.getint("scheduler", "num_stuck_in_queued_retries") for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): -try: -cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) -for ti in stuck_tis: -if repr(ti) in cleaned_up_task_instances: -self.log.warning( -"Marking task instance %s stuck in queued as failed. " -"If the task instance has available retries, it will be retried.", -ti, +tis: Iterable[TaskInstance] = [] +with suppress(NotImplementedError): +# BaseExecutor has "abstract" method `cleanup_stuck_queued_tasks` +# We are tolerant of implementers not implementing it. +tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis) +for ti in tis: +if not isinstance(ti, TaskInstance): +# todo: when can we remove this? +# this is for backcompat. the pre-2.10.4 version of the interface +# expected a string return val. +self.log.warning( +"Task instance %s stuck in queued. May be set to failed.", +ti, +) +continue + +self.log.warning("Task stuck in queued and may be requeued. task_id=%s", ti.key) Review Comment: updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1835154201 ## airflow/executors/base_executor.py: ## @@ -540,7 +540,9 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +def cleanup_stuck_queued_tasks( +self, tis: list[TaskInstance] +) -> Iterable[TaskInstance]: # pragma: no cover """ Handle remnants of tasks that were failed because they were stuck in queued. Review Comment: no longer changing this function's behavior -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jedcunningham commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1835018068 ## airflow/executors/base_executor.py: ## @@ -540,7 +540,9 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +def cleanup_stuck_queued_tasks( +self, tis: list[TaskInstance] +) -> Iterable[TaskInstance]: # pragma: no cover """ Handle remnants of tasks that were failed because they were stuck in queued. Review Comment: We should reword this docsrting too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jedcunningham commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1834877987 ## airflow/executors/base_executor.py: ## @@ -540,7 +540,9 @@ def terminate(self): """Get called when the daemon receives a SIGTERM.""" raise NotImplementedError -def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: # pragma: no cover +def cleanup_stuck_queued_tasks( +self, tis: list[TaskInstance] +) -> Iterable[TaskInstance]: # pragma: no cover Review Comment: I'm a bit concerned about this return type change (str -> TI), and also the behavior change for this method. BaseExecutor is public, so we have to consider executors beyond those in our repos. And today, we expect other executors to fail tasks in this method. ## providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py: ## @@ -443,6 +446,23 @@ def delete_pod(self, pod_name: str, namespace: str) -> None: if str(e.status) != "404": raise +def patch_pod_delete_stuck(self, *, pod_name: str, namespace: str): +"""Add a "done" annotation to ensure we don't continually adopt pods.""" Review Comment: docstring was never updated after copy/paste ## airflow/jobs/scheduler_job_runner.py: ## @@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ) ).all() +num_allowed_retries = conf.getint("scheduler", "num_stuck_in_queued_retries") for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): -try: -cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) -for ti in stuck_tis: -if repr(ti) in cleaned_up_task_instances: -self.log.warning( -"Marking task instance %s stuck in queued as failed. " -"If the task instance has available retries, it will be retried.", -ti, +tis: Iterable[TaskInstance] = [] +with suppress(NotImplementedError): +# BaseExecutor has "abstract" method `cleanup_stuck_queued_tasks` +# We are tolerant of implementers not implementing it. +tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis) +for ti in tis: +if not isinstance(ti, TaskInstance): +# todo: when can we remove this? +# this is for backcompat. the pre-2.10.4 version of the interface +# expected a string return val. +self.log.warning( +"Task instance %s stuck in queued. May be set to failed.", Review Comment: If we get a str back, shouldn't we assume it was set to failed? That's the current expectation, right? ## airflow/jobs/scheduler_job_runner.py: ## @@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ) ).all() +num_allowed_retries = conf.getint("scheduler", "num_stuck_in_queued_retries") for executor, stuck_tis in self._executor_to_tis(tasks_stuck_in_queued).items(): -try: -cleaned_up_task_instances = set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis)) -for ti in stuck_tis: -if repr(ti) in cleaned_up_task_instances: -self.log.warning( -"Marking task instance %s stuck in queued as failed. " -"If the task instance has available retries, it will be retried.", -ti, +tis: Iterable[TaskInstance] = [] +with suppress(NotImplementedError): +# BaseExecutor has "abstract" method `cleanup_stuck_queued_tasks` +# We are tolerant of implementers not implementing it. +tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis) +for ti in tis: +if not isinstance(ti, TaskInstance): +# todo: when can we remove this? +# this is for backcompat. the pre-2.10.4 version of the interface +# expected a string return val. +self.log.warning( +"Task instance %s stuck in queued. May be set to failed.", +ti, +) +continue + +self.log.warning("Task stuck in queued and may be requeued. task_id=%s", ti.key) + +num_times_stuck = self._get_num_times_stuck_in_queued(ti, session) +if num_times_stuck < num_allowed_retries: +session.add( +
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dimberman commented on PR #43520: URL: https://github.com/apache/airflow/pull/43520#issuecomment-2465299664 I have tested this PR by doing the following: 1. Created two airflows in helm, one Celery and one KubernetesExecutor, I lowered the timeout for queued to 10 seconds 2. Ensured that both Airflows ran correctly 3. For the CeleryExecutor I then removed the workers and ensured that the correct logs showed up 4. For KubernetesExecutor I modified the pod_template_File to ensure that the pod would be stuck in pending. In that same fashion the following logs showed up indicating that the scheduler attempted to restart the queued task 3 times before failing https://github.com/user-attachments/assets/dd5d700a-a526-48b8-999f-f3829c7526e2";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1829989061 ## providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py: ## @@ -636,6 +638,10 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: self.log.warning("Cannot find pod for ti %s", ti) continue readable_tis.append(repr(ti)) +if Version(airflow_version) < Version("2.10.4"): Review Comment: should this be a greater than? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jedcunningham commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1827890135 ## airflow/config_templates/config.yml: ## @@ -386,6 +386,14 @@ core: type: integer example: ~ default: "30" +num_stuck_reschedules: + description: | +Number of times Airflow will reschedule a task that gets stuck in queued before marking the +task as failed + version_added: 2.10.0 Review Comment: ```suggestion version_added: 2.10.4 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jedcunningham commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826231840 ## airflow/jobs/scheduler_job_runner.py: ## @@ -98,6 +98,8 @@ DR = DagRun DM = DagModel +RETRY_STUCK_IN_QUEUED_EVENT = "retrying stuck in queued" Review Comment: ```suggestion RESCHEDULE_STUCK_IN_QUEUED_EVENT = "rescheduling stuck in queued" ``` etc. ## airflow/jobs/scheduler_job_runner.py: ## @@ -1785,15 +1787,23 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, callback: DagCallbackReques self.log.debug("callback is empty") @provide_session -def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: +def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: """ -Mark tasks stuck in queued for longer than `task_queued_timeout` as failed. + +Handle the scenario where a task is queued for longer than `task_queued_timeout`. Tasks can get stuck in queued for a wide variety of reasons (e.g. celery loses track of a task, a cluster can't further scale up its workers, etc.), but tasks -should not be stuck in queued for a long time. This will mark tasks stuck in -queued for longer than `self._task_queued_timeout` as failed. If the task has -available retries, it will be retried. +should not be stuck in queued for a long time. + +Originally, we simply marked a task as failed when it was stuck in queued for +too long. We found that this led to suboptimal outcomes as ideally we would like "failed" +to mean that a task was unable to run, instead of it meaning that we were unable to run the task. + +As a compromise between always failing a stuck task and always rescheduling a stuck task (which could +lead to tasks being stuck in queued forever without informing the user), we have creating the config +`AIRFLOW__CORE__NUM_STUCK_RETRIES`. With this new configuration, an airflow admin can decide how Review Comment: ```suggestion `[core] num_stuck_reschedules`. With this new configuration, an airflow admin can decide how ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dimberman commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826198858 ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: for ti in tis: readable_tis.append(repr(ti)) task_instance_key = ti.key -self.fail(task_instance_key, None) Review Comment: @dstandish yep added exactly that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826132005 ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: for ti in tis: readable_tis.append(repr(ti)) task_instance_key = ti.key -self.fail(task_instance_key, None) Review Comment: it's possible in the provider to inspect the airflow version and handle conditionally -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dimberman commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826114963 ## uv.lock: ## Review Comment: Oh weird, I explicitly deleted this and commited, I'll add that to my global gitignore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dimberman commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826114221 ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: for ti in tis: readable_tis.append(repr(ti)) task_instance_key = ti.key -self.fail(task_instance_key, None) Review Comment: @jedcunningham can you explain a bit further? Is there a workaround we can consider? The reason we did this is because the idea of failing the task and then setting it to scheduled after felt fraught with potential bugs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
jedcunningham commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826054815 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1817,17 +1828,59 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ) session.add( Log( -event="stuck in queued", +event=RETRY_STUCK_IN_QUEUED_EVENT, task_instance=ti.key, extra=( -"Task will be marked as failed. If the task instance has " +f"Task was stuck in queued and will be retried, once it has hit {num_allowed_retries} attempts" Review Comment: ```suggestion f"Task was stuck in queued and will be requeued, once it has hit {num_allowed_retries} attempts" ``` It might be good to avoid "retry" here to avoid confusion between dag-author-configured-retries and these retries. ## airflow/jobs/scheduler_job_runner.py: ## @@ -1805,6 +1815,7 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ) ).all() +num_allowed_retries = conf.getint("core", "num_stuck_retries", fallback=2) Review Comment: ```suggestion num_allowed_retries = conf.getint("core", "num_stuck_retries") ``` You need to add this to [config.yml](https://github.com/apache/airflow/blob/main/airflow/config_templates/config.yml) (and then won't need a fallback). ## uv.lock: ## Review Comment: I assume you didn't intend to add this :) ## providers/src/airflow/providers/celery/executors/celery_executor.py: ## @@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]: for ti in tis: readable_tis.append(repr(ti)) task_instance_key = ti.key -self.fail(task_instance_key, None) Review Comment: This is problematic - folks could install the provider with this code onto 2.8. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dimberman commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826037304 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1822,9 +1822,30 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ), ) ) Review Comment: Yes I've created separate logs for rescheduling and failing of tasks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dimberman commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1826037677 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1822,9 +1822,33 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ), ) ) +num_times_stuck = self._get_num_times_stuck(ti, session) +if num_times_stuck < conf.getint("core", "num_stuck_retries", fallback=3): +self._reset_task_instance(ti, session) +else: +executor.fail(ti.key) + + except NotImplementedError: self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") +@provide_session +def _get_num_times_stuck(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: +return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id) +.where(Log.dag_id == ti.dag_id) +.where(Log.run_id == ti.run_id) +.where(Log.map_index == ti.map_index) +.where(Log.try_number == ti.try_number) +.where(Log.event == "stuck in queued") Review Comment: Agreed, was on my list to do before taking PR out of draft :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1823520481 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1822,9 +1822,33 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ), ) ) +num_times_stuck = self._get_num_times_stuck(ti, session) +if num_times_stuck < conf.getint("core", "num_stuck_retries", fallback=3): +self._reset_task_instance(ti, session) +else: +executor.fail(ti.key) + + except NotImplementedError: self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") +@provide_session +def _get_num_times_stuck(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: +return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id) +.where(Log.dag_id == ti.dag_id) +.where(Log.run_id == ti.run_id) +.where(Log.map_index == ti.map_index) +.where(Log.try_number == ti.try_number) +.where(Log.event == "stuck in queued") Review Comment: simpler / cleaner i think to just use the `*args` e.g. ```suggestion .where( Log.dag_id == ti.dag_id, Log.run_id == ti.run_id, Log.map_index == ti.map_index, Log.try_number == ti.try_number, Log.event == "stuck in queued", ) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1823518651 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1822,9 +1822,33 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ), ) ) +num_times_stuck = self._get_num_times_stuck(ti, session) +if num_times_stuck < conf.getint("core", "num_stuck_retries", fallback=3): +self._reset_task_instance(ti, session) +else: +executor.fail(ti.key) + + except NotImplementedError: self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") +@provide_session +def _get_num_times_stuck(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: +return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id) +.where(Log.dag_id == ti.dag_id) +.where(Log.run_id == ti.run_id) +.where(Log.map_index == ti.map_index) +.where(Log.try_number == ti.try_number) +.where(Log.event == "stuck in queued") Review Comment: should be constant of some kind to ensure that the events are created / looked up with same text -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
o-nikolas commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1823354279 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1822,9 +1822,30 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ), ) ) Review Comment: Can you update the wording in the logging and docstring above, given the new behaviour. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] Allow for retry when tasks are stuck in queued [airflow]
dstandish commented on code in PR #43520: URL: https://github.com/apache/airflow/pull/43520#discussion_r1823096131 ## airflow/jobs/scheduler_job_runner.py: ## @@ -1822,9 +1822,30 @@ def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None: ), ) ) +num_times_stuck = self._get_num_times_stuck(ti, session) +if num_times_stuck < conf.getint("core", "num_stuck_retries", fallback=3): +self._reset_task_instance(ti, session) + + except NotImplementedError: self.log.debug("Executor doesn't support cleanup of stuck queued tasks. Skipping.") +@provide_session +def _get_num_times_stuck(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int: +return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id) +.where(Log.dag_id == ti.dag_id) +.where(Log.run_id == ti.run_id) +.where(Log.map_index == ti.map_index) +.where(Log.try_number == ti.try_number) +.where(Log.event == "stuck in queued") +)) + +def _reset_task_instance(self, ti: TaskInstance, session: Session = NEW_SESSION): +ti.external_executor_id = None +ti.state = State.SCHEDULED +session.add(ti) Review Comment: Merge not add -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org