Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-18 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1847312911


##
airflow/executors/base_executor.py:
##
@@ -552,7 +554,12 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+@deprecated(

Review Comment:
   sounds good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-18 Thread via GitHub


jscheffl commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1847172248


##
airflow/executors/base_executor.py:
##
@@ -552,7 +554,12 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+@deprecated(

Review Comment:
   @dstandish Triued the new cherry-picker tool and made a back-port. feel free 
to debate/block/approve there.
   
   I'd like to back-port, then the deprecation is made visible early. 
Additionally the missing feature (in my view: bug) is resolved in 2.10.4 as 
well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-18 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1846890357


##
airflow/executors/base_executor.py:
##
@@ -552,7 +554,12 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+@deprecated(

Review Comment:
   i am not sure what this will be backported to.  I think 2.11 is the most "by 
the book" but others have suggested it could or should maybe go to 2.10 @potiuk 
@jedcunningham 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-16 Thread via GitHub


jscheffl commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844943356


##
airflow/executors/base_executor.py:
##
@@ -552,7 +554,12 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+@deprecated(

Review Comment:
   @dstandish 
   
   As this is a change of (public) API of BaseExecutor, are you back-porting 
this to `v2-10-test` branch (the Interface change) to know that other executor 
implementations need to adjust?
   
   Otherwise, is this PR in general a bug-fix that would be valid to be ported 
to 2.10-line of Airflow or is this treated as new feature for Airflow 3? (I see 
this as 70% a bug fix for operational problem improvement and 30% a feature 
that myself I always found missing but never made it to the idea to contribute 
myself).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-16 Thread via GitHub


jscheffl commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844943356


##
airflow/executors/base_executor.py:
##
@@ -552,7 +554,12 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+@deprecated(

Review Comment:
   As this is a change of (public) API of BaseExecutor, are you back-porting 
this to `v2-10-test` branch (the Interface change) to know that other executor 
implementations need to adjust?
   
   Otherwise, is this PR in general a bug-fix that would be valid to be ported 
to 2.10-line of Airflow or is this treated as new feature for Airflow 3? (I see 
this as 70% a bug fix for operational problem improvement and 30% a feature 
that myself I always found missing but never made it to the idea to contribute 
myself).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


dstandish merged PR #43520:
URL: https://github.com/apache/airflow/pull/43520


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844674828


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, 
callback: DagCallbackReques
 self.log.debug("callback is empty")
 
 @provide_session
-def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
 """
-Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+
+Handle the scenario where a task is queued for longer than 
`task_queued_timeout`.
 
 Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
 track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
-should not be stuck in queued for a long time. This will mark tasks 
stuck in
-queued for longer than `self._task_queued_timeout` as failed. If the 
task has
-available retries, it will be retried.
+should not be stuck in queued for a long time.
+
+Originally, we simply marked a task as failed when it was stuck in 
queued for
+too long. We found that this led to suboptimal outcomes as ideally we 
would like "failed"
+to mean that a task was unable to run, instead of it meaning that we 
were unable to run the task.
+
+As a compromise between always failing a stuck task and always 
rescheduling a stuck task (which could

Review Comment:
   yea



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844673286


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, 
callback: DagCallbackReques
 self.log.debug("callback is empty")
 
 @provide_session
-def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
 """
-Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+
+Handle the scenario where a task is queued for longer than 
`task_queued_timeout`.
 
 Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
 track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
-should not be stuck in queued for a long time. This will mark tasks 
stuck in
-queued for longer than `self._task_queued_timeout` as failed. If the 
task has
-available retries, it will be retried.
+should not be stuck in queued for a long time.
+
+Originally, we simply marked a task as failed when it was stuck in 
queued for
+too long. We found that this led to suboptimal outcomes as ideally we 
would like "failed"
+to mean that a task was unable to run, instead of it meaning that we 
were unable to run the task.
+
+As a compromise between always failing a stuck task and always 
rescheduling a stuck task (which could
+lead to tasks being stuck in queued forever without informing the 
user).
 """
-self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued 
method")
+tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
+for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
+try:
+for ti in stuck_tis:
+executor.revoke_task(ti=ti)
+self._maybe_requeue_stuck_ti(
+ti=ti,
+session=session,
+)
+except NotImplementedError:
+# this block only gets entered if the executor has not 
implemented `revoke_task`.
+# in which case, we try the fallback logic
+# todo: remove the call to _stuck_in_queued_backcompat_logic 
in airflow 3.0.
+#   after 3.0, `cleanup_stuck_queued_tasks` will be removed, 
so we should
+#   just continue immediately.
+self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
+continue
 
-tasks_stuck_in_queued = session.scalars(
+def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
+"""Query db for TIs that are stuck in queued."""
+return session.scalars(
 select(TI).where(
 TI.state == TaskInstanceState.QUEUED,
 TI.queued_dttm < (timezone.utcnow() - 
timedelta(seconds=self._task_queued_timeout)),
 TI.queued_by_job_id == self.job.id,
 )
-).all()
+)
 
-for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
-try:
-cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
-for ti in stuck_tis:
-if repr(ti) in cleaned_up_task_instances:
-self.log.warning(
-"Marking task instance %s stuck in queued as 
failed. "
-"If the task instance has available retries, it 
will be retried.",
-ti,
-)
-session.add(
-Log(
-event="stuck in queued",
-task_instance=ti.key,
-extra=(
-"Task will be marked as failed. If the 
task instance has "
-"available retries, it will be retried."
-),
-)
-)
-except NotImplementedError:
-self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
+def _maybe_requeue_stuck_ti(self, *, ti, session):
+"""
+Requeue task if it has not been attempted too many times.
+
+Otherwise, fail it.
+"""
+num_times_stuck = self._get_num_times_stuck_in_queued(ti, session)
+if num_times_stuck < self._num_stuck_queued_retries:
+self.log.info("Task stuck in queued; will try to requeue. 
task_id=%s", ti.task_id)
+session.add(
+Log(
+event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT

Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844665285


##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
 return not_adopted_tis
 
+@deprecated(
+reason="Replaced by function `revoke_task`. Upgrade airflow core to 
make this go away.",

Review Comment:
   Yeah. It doesn't really matter. Just feels a bit cleaner to me.
   
   We should at least state what version to upgrade to.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844671986


##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
 return not_adopted_tis
 
+@deprecated(
+reason="Replaced by function `revoke_task`. Upgrade airflow core to 
make this go away.",

Review Comment:
   it would also only show up in scheduler logs, and only when stuck quueued 
tasks processed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844671484


##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
 return not_adopted_tis
 
+@deprecated(
+reason="Replaced by function `revoke_task`. Upgrade airflow core to 
make this go away.",

Review Comment:
   we don't know what version this is gonna be released in, alas



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844658622


##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
 return not_adopted_tis
 
+@deprecated(
+reason="Replaced by function `revoke_task`. Upgrade airflow core to 
make this go away.",

Review Comment:
   Why not deprecate?  This warning would only appear if user upgrades celery 
but not airflow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844644649


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, 
callback: DagCallbackReques
 self.log.debug("callback is empty")
 
 @provide_session
-def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
 """
-Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+
+Handle the scenario where a task is queued for longer than 
`task_queued_timeout`.
 
 Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
 track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
-should not be stuck in queued for a long time. This will mark tasks 
stuck in
-queued for longer than `self._task_queued_timeout` as failed. If the 
task has
-available retries, it will be retried.
+should not be stuck in queued for a long time.
+
+Originally, we simply marked a task as failed when it was stuck in 
queued for
+too long. We found that this led to suboptimal outcomes as ideally we 
would like "failed"
+to mean that a task was unable to run, instead of it meaning that we 
were unable to run the task.
+
+As a compromise between always failing a stuck task and always 
rescheduling a stuck task (which could

Review Comment:
   We don't way what the compromise is. Should reword this a bit.



##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -433,31 +434,34 @@ def try_adopt_task_instances(self, tis: 
Sequence[TaskInstance]) -> Sequence[Task
 
 return not_adopted_tis
 
+@deprecated(
+reason="Replaced by function `revoke_task`. Upgrade airflow core to 
make this go away.",

Review Comment:
   hmm, I wonder if we should instead not deprecate this, but leave a comment 
that it can be removed when the min airflow version for providers is 2.11?



##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1767,48 +1780,137 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, 
callback: DagCallbackReques
 self.log.debug("callback is empty")
 
 @provide_session
-def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
 """
-Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+
+Handle the scenario where a task is queued for longer than 
`task_queued_timeout`.
 
 Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
 track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
-should not be stuck in queued for a long time. This will mark tasks 
stuck in
-queued for longer than `self._task_queued_timeout` as failed. If the 
task has
-available retries, it will be retried.
+should not be stuck in queued for a long time.
+
+Originally, we simply marked a task as failed when it was stuck in 
queued for
+too long. We found that this led to suboptimal outcomes as ideally we 
would like "failed"
+to mean that a task was unable to run, instead of it meaning that we 
were unable to run the task.
+
+As a compromise between always failing a stuck task and always 
rescheduling a stuck task (which could
+lead to tasks being stuck in queued forever without informing the 
user).
 """
-self.log.debug("Calling SchedulerJob._fail_tasks_stuck_in_queued 
method")
+tasks_stuck_in_queued = self._get_tis_stuck_in_queued(session)
+for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
+try:
+for ti in stuck_tis:
+executor.revoke_task(ti=ti)
+self._maybe_requeue_stuck_ti(
+ti=ti,
+session=session,
+)
+except NotImplementedError:
+# this block only gets entered if the executor has not 
implemented `revoke_task`.
+# in which case, we try the fallback logic
+# todo: remove the call to _stuck_in_queued_backcompat_logic 
in airflow 3.0.
+#   after 3.0, `cleanup_stuck_queued_tasks` will be removed, 
so we should
+#   just continue immediately.
+self._stuck_in_queued_backcompat_logic(executor, stuck_tis)
+continue
 
-tasks_stuck_in_queued = session.scalars(
+def _get_tis_stuck_in_queued(self, session) -> Iterable[TaskInstance]:
+"""Query db for TIs that are stuck in queu

Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844406625


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   Alright.  I have made this an undocumented param.  Hopefully this pleases 
everyone.  If a sophisticated user (like an airflow-as-a-service provider) 
wanted to customize this, they can.  But for average user, it is invisible.
   
   And making it configurable-albeit-hidden is a bonus for test readability 
because we can control exactly how many time it should retry, and this is 
important to control for testing.  Resolving convo.  PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-15 Thread via GitHub


eladkal commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1844191148


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   > so yeah, these are very much different things.
   do we need num_stuck_in_queued_retries to be configurable? not necessarily.
   
   That is where I was going. This sounds more like airflow internal mechanism. 
I think we better not expose this and possibly cause confusion how settings are 
interacting and what they are about



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-14 Thread via GitHub


collinmcnulty commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1842556180


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   task queued timeout consumes the retries on the task as set by the dag 
author. This is a problem when the task is not idempotent and therefore 
deliberately has retries set to 0. But the particular failure of a task to even 
get to running should not actually consume the task's retries that are for the 
running of the task. So we still need the timeout setting, but we also need a 
separate counter for attempts to get from queued to running, whereas retries 
are attempts to get from running to success.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-14 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1842560376


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   so yeah, these are very much different things.
   do we need `num_stuck_in_queued_retries` to be configurable? not necessarily.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-13 Thread via GitHub


eladkal commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   I am late to join the party but I don't follow why we need this. We have 
[AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout)
 which introduced in Airflow 2.6
   Which means that task retries after timeout passed. If for some reason this 
setting isn't doing it now the we should fix it rather than adding a new 
setting.
   
   `task_queued_timeout` + `num_stuck_in_queued_retries` feels too complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-13 Thread via GitHub


eladkal commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   I am late to join the party but I don't follow why we need this. We have 
[AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout)
 which introduced in Airflow 2.6
   Which means that task retries after timeout passed. If for some reason this 
setting isn't doing it now the we should fix it rather than adding a new 
setting.
   
   task_queued_timeout + num_stuck_in_queued_retries feels too complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-13 Thread via GitHub


eladkal commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   I am late to join the party but I don't follow why we need this. We have 
[AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout)
 introduced in Airflow 2.6
   Which means that task retries after timeout passed. If for some reason this 
setting isn't doing it now the we should fix it rather than adding a new 
setting.
   
   `task_queued_timeout` + `num_stuck_in_queued_retries` feels too complicated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-13 Thread via GitHub


eladkal commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   I am late to join the party but I don't follow why we need this. We have 
[AIRFLOW__SCHEDULER__TASK_QUEUED_TIMEOUT](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout)
 which introduced in Airflow 2.6
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-13 Thread via GitHub


eladkal commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1841650852


##
airflow/config_templates/config.yml:
##
@@ -2331,6 +2331,14 @@ scheduler:
   type: integer
   example: ~
   default: "8974"
+num_stuck_in_queued_retries:

Review Comment:
   I am late to join the party but I don't follow why we need this. We have 
[AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR](https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#task-queued-timeout)
 which introduced in Airflow 2.6
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-08 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1835163879


##
providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##
@@ -443,6 +446,23 @@ def delete_pod(self, pod_name: str, namespace: str) -> 
None:
 if str(e.status) != "404":
 raise
 
+def patch_pod_delete_stuck(self, *, pod_name: str, namespace: str):
+"""Add a "done" annotation to ensure we don't continually adopt 
pods."""

Review Comment:
   fixed



##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session 
= NEW_SESSION) -> None:
 )
 ).all()
 
+num_allowed_retries = conf.getint("scheduler", 
"num_stuck_in_queued_retries")
 for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
-try:
-cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
-for ti in stuck_tis:
-if repr(ti) in cleaned_up_task_instances:
-self.log.warning(
-"Marking task instance %s stuck in queued as 
failed. "
-"If the task instance has available retries, it 
will be retried.",
-ti,
+tis: Iterable[TaskInstance] = []
+with suppress(NotImplementedError):
+# BaseExecutor has "abstract" method 
`cleanup_stuck_queued_tasks`
+# We are tolerant of implementers not implementing it.
+tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis)
+for ti in tis:
+if not isinstance(ti, TaskInstance):
+# todo: when can we remove this?
+#   this is for backcompat. the pre-2.10.4 version of the 
interface
+#   expected a string return val.
+self.log.warning(
+"Task instance %s stuck in queued.  May be set to 
failed.",
+ti,
+)
+continue
+
+self.log.warning("Task stuck in queued and may be requeued. 
task_id=%s", ti.key)
+
+num_times_stuck = self._get_num_times_stuck_in_queued(ti, 
session)
+if num_times_stuck < num_allowed_retries:
+session.add(
+Log(
+event=TASK_STUCK_IN_QUEUED_RESCHEDULE_EVENT,
+task_instance=ti.key,
+extra=(
+f"Task was in queued state for longer than 
{self._task_queued_timeout} "
+"seconds; task state will be set back to 
scheduled."
+),
 )
-session.add(
-Log(
-event="stuck in queued",
-task_instance=ti.key,
-extra=(
-"Task will be marked as failed. If the 
task instance has "
-"available retries, it will be retried."
-),
-)
+)
+with suppress(KeyError):
+executor.running.remove(ti.key)
+self._reschedule_stuck_task(ti)
+else:
+self.log.warning(
+"Task requeue attempts exceeded max; marking failed. 
task_instance=%s", ti
+)
+session.add(
+Log(
+event="stuck in queued tries exceeded",
+task_instance=ti.key,
+extra=(
+f"Task was requeued more than 
{num_allowed_retries} times "
+"and will be failed."
+),
 )
-except NotImplementedError:
-self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
+)
+executor.fail(ti.key)
+
+@provide_session
+def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
+session.execute(
+update(TI)
+.where(TI.filter_for_tis([ti]))
+.values(
+state=TaskInstanceState.SCHEDULED,
+queued_dttm=None,
+)
+.execution_options(synchronize_session=False)
+)
+
+@provide_session
+def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: 
Session = NEW_SESSION) -> int:
+"""
+Check the Log table to see how many times a

Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-08 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1835163761


##
airflow/executors/base_executor.py:
##
@@ -540,7 +540,9 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+def cleanup_stuck_queued_tasks(
+self, tis: list[TaskInstance]
+) -> Iterable[TaskInstance]:  # pragma: no cover

Review Comment:
   outdated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-08 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1835163642


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session 
= NEW_SESSION) -> None:
 )
 ).all()
 
+num_allowed_retries = conf.getint("scheduler", 
"num_stuck_in_queued_retries")
 for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
-try:
-cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
-for ti in stuck_tis:
-if repr(ti) in cleaned_up_task_instances:
-self.log.warning(
-"Marking task instance %s stuck in queued as 
failed. "
-"If the task instance has available retries, it 
will be retried.",
-ti,
+tis: Iterable[TaskInstance] = []
+with suppress(NotImplementedError):
+# BaseExecutor has "abstract" method 
`cleanup_stuck_queued_tasks`
+# We are tolerant of implementers not implementing it.
+tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis)
+for ti in tis:
+if not isinstance(ti, TaskInstance):
+# todo: when can we remove this?
+#   this is for backcompat. the pre-2.10.4 version of the 
interface
+#   expected a string return val.
+self.log.warning(
+"Task instance %s stuck in queued.  May be set to 
failed.",

Review Comment:
   outdated; we added new func



##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session 
= NEW_SESSION) -> None:
 )
 ).all()
 
+num_allowed_retries = conf.getint("scheduler", 
"num_stuck_in_queued_retries")
 for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
-try:
-cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
-for ti in stuck_tis:
-if repr(ti) in cleaned_up_task_instances:
-self.log.warning(
-"Marking task instance %s stuck in queued as 
failed. "
-"If the task instance has available retries, it 
will be retried.",
-ti,
+tis: Iterable[TaskInstance] = []
+with suppress(NotImplementedError):
+# BaseExecutor has "abstract" method 
`cleanup_stuck_queued_tasks`
+# We are tolerant of implementers not implementing it.
+tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis)
+for ti in tis:
+if not isinstance(ti, TaskInstance):
+# todo: when can we remove this?
+#   this is for backcompat. the pre-2.10.4 version of the 
interface
+#   expected a string return val.
+self.log.warning(
+"Task instance %s stuck in queued.  May be set to 
failed.",
+ti,
+)
+continue
+
+self.log.warning("Task stuck in queued and may be requeued. 
task_id=%s", ti.key)

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-08 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1835154201


##
airflow/executors/base_executor.py:
##
@@ -540,7 +540,9 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+def cleanup_stuck_queued_tasks(
+self, tis: list[TaskInstance]
+) -> Iterable[TaskInstance]:  # pragma: no cover
 """
 Handle remnants of tasks that were failed because they were stuck in 
queued.

Review Comment:
   no longer changing this function's behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-08 Thread via GitHub


jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1835018068


##
airflow/executors/base_executor.py:
##
@@ -540,7 +540,9 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+def cleanup_stuck_queued_tasks(
+self, tis: list[TaskInstance]
+) -> Iterable[TaskInstance]:  # pragma: no cover
 """
 Handle remnants of tasks that were failed because they were stuck in 
queued.

Review Comment:
   We should reword this docsrting too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-08 Thread via GitHub


jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1834877987


##
airflow/executors/base_executor.py:
##
@@ -540,7 +540,9 @@ def terminate(self):
 """Get called when the daemon receives a SIGTERM."""
 raise NotImplementedError
 
-def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> 
list[str]:  # pragma: no cover
+def cleanup_stuck_queued_tasks(
+self, tis: list[TaskInstance]
+) -> Iterable[TaskInstance]:  # pragma: no cover

Review Comment:
   I'm a bit concerned about this return type change (str -> TI), and also the 
behavior change for this method. BaseExecutor is public, so we have to consider 
executors beyond those in our repos. And today, we expect other executors to 
fail tasks in this method.



##
providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py:
##
@@ -443,6 +446,23 @@ def delete_pod(self, pod_name: str, namespace: str) -> 
None:
 if str(e.status) != "404":
 raise
 
+def patch_pod_delete_stuck(self, *, pod_name: str, namespace: str):
+"""Add a "done" annotation to ensure we don't continually adopt 
pods."""

Review Comment:
   docstring was never updated after copy/paste



##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session 
= NEW_SESSION) -> None:
 )
 ).all()
 
+num_allowed_retries = conf.getint("scheduler", 
"num_stuck_in_queued_retries")
 for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
-try:
-cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
-for ti in stuck_tis:
-if repr(ti) in cleaned_up_task_instances:
-self.log.warning(
-"Marking task instance %s stuck in queued as 
failed. "
-"If the task instance has available retries, it 
will be retried.",
-ti,
+tis: Iterable[TaskInstance] = []
+with suppress(NotImplementedError):
+# BaseExecutor has "abstract" method 
`cleanup_stuck_queued_tasks`
+# We are tolerant of implementers not implementing it.
+tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis)
+for ti in tis:
+if not isinstance(ti, TaskInstance):
+# todo: when can we remove this?
+#   this is for backcompat. the pre-2.10.4 version of the 
interface
+#   expected a string return val.
+self.log.warning(
+"Task instance %s stuck in queued.  May be set to 
failed.",

Review Comment:
   If we get a str back, shouldn't we assume it was set to failed? That's the 
current expectation, right?



##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1796,28 +1808,95 @@ def _fail_tasks_stuck_in_queued(self, session: Session 
= NEW_SESSION) -> None:
 )
 ).all()
 
+num_allowed_retries = conf.getint("scheduler", 
"num_stuck_in_queued_retries")
 for executor, stuck_tis in 
self._executor_to_tis(tasks_stuck_in_queued).items():
-try:
-cleaned_up_task_instances = 
set(executor.cleanup_stuck_queued_tasks(tis=stuck_tis))
-for ti in stuck_tis:
-if repr(ti) in cleaned_up_task_instances:
-self.log.warning(
-"Marking task instance %s stuck in queued as 
failed. "
-"If the task instance has available retries, it 
will be retried.",
-ti,
+tis: Iterable[TaskInstance] = []
+with suppress(NotImplementedError):
+# BaseExecutor has "abstract" method 
`cleanup_stuck_queued_tasks`
+# We are tolerant of implementers not implementing it.
+tis = executor.cleanup_stuck_queued_tasks(tis=stuck_tis)
+for ti in tis:
+if not isinstance(ti, TaskInstance):
+# todo: when can we remove this?
+#   this is for backcompat. the pre-2.10.4 version of the 
interface
+#   expected a string return val.
+self.log.warning(
+"Task instance %s stuck in queued.  May be set to 
failed.",
+ti,
+)
+continue
+
+self.log.warning("Task stuck in queued and may be requeued. 
task_id=%s", ti.key)
+
+num_times_stuck = self._get_num_times_stuck_in_queued(ti, 
session)
+if num_times_stuck < num_allowed_retries:
+session.add(
+ 

Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-08 Thread via GitHub


dimberman commented on PR #43520:
URL: https://github.com/apache/airflow/pull/43520#issuecomment-2465299664

   I have tested this PR by doing the following:
   
   1. Created two airflows in helm, one Celery and one KubernetesExecutor, I 
lowered the timeout for queued to 10 seconds
   2. Ensured that both Airflows ran correctly
   3. For the CeleryExecutor I then removed the workers and ensured that the 
correct logs showed up
   4. For KubernetesExecutor I modified the pod_template_File to ensure that 
the pod would be stuck in pending. In that same fashion the following logs 
showed up indicating that the scheduler attempted to restart the queued task 3 
times before failing
   https://github.com/user-attachments/assets/dd5d700a-a526-48b8-999f-f3829c7526e2";>
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-05 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1829989061


##
providers/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py:
##
@@ -636,6 +638,10 @@ def cleanup_stuck_queued_tasks(self, tis: 
list[TaskInstance]) -> list[str]:
 self.log.warning("Cannot find pod for ti %s", ti)
 continue
 readable_tis.append(repr(ti))
+if Version(airflow_version) < Version("2.10.4"):

Review Comment:
   should this be a greater than?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-04 Thread via GitHub


jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1827890135


##
airflow/config_templates/config.yml:
##
@@ -386,6 +386,14 @@ core:
   type: integer
   example: ~
   default: "30"
+num_stuck_reschedules:
+  description: |
+Number of times Airflow will reschedule a task that gets stuck in 
queued before marking the
+task as failed
+  version_added: 2.10.0

Review Comment:
   ```suggestion
 version_added: 2.10.4
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826231840


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -98,6 +98,8 @@
 DR = DagRun
 DM = DagModel
 
+RETRY_STUCK_IN_QUEUED_EVENT = "retrying stuck in queued"

Review Comment:
   ```suggestion
   RESCHEDULE_STUCK_IN_QUEUED_EVENT = "rescheduling stuck in queued"
   ```
   
   etc.



##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1785,15 +1787,23 @@ def _send_dag_callbacks_to_processor(self, dag: DAG, 
callback: DagCallbackReques
 self.log.debug("callback is empty")
 
 @provide_session
-def _fail_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
+def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> 
None:
 """
-Mark tasks stuck in queued for longer than `task_queued_timeout` as 
failed.
+
+Handle the scenario where a task is queued for longer than 
`task_queued_timeout`.
 
 Tasks can get stuck in queued for a wide variety of reasons (e.g. 
celery loses
 track of a task, a cluster can't further scale up its workers, etc.), 
but tasks
-should not be stuck in queued for a long time. This will mark tasks 
stuck in
-queued for longer than `self._task_queued_timeout` as failed. If the 
task has
-available retries, it will be retried.
+should not be stuck in queued for a long time.
+
+Originally, we simply marked a task as failed when it was stuck in 
queued for
+too long. We found that this led to suboptimal outcomes as ideally we 
would like "failed"
+to mean that a task was unable to run, instead of it meaning that we 
were unable to run the task.
+
+As a compromise between always failing a stuck task and always 
rescheduling a stuck task (which could
+lead to tasks being stuck in queued forever without informing the 
user), we have creating the config
+`AIRFLOW__CORE__NUM_STUCK_RETRIES`. With this new configuration, an 
airflow admin can decide how

Review Comment:
   ```suggestion
   `[core] num_stuck_reschedules`. With this new configuration, an 
airflow admin can decide how
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


dimberman commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826198858


##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: 
list[TaskInstance]) -> list[str]:
 for ti in tis:
 readable_tis.append(repr(ti))
 task_instance_key = ti.key
-self.fail(task_instance_key, None)

Review Comment:
   @dstandish yep added exactly that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826132005


##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: 
list[TaskInstance]) -> list[str]:
 for ti in tis:
 readable_tis.append(repr(ti))
 task_instance_key = ti.key
-self.fail(task_instance_key, None)

Review Comment:
   it's possible in the provider to inspect the airflow version and handle 
conditionally
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


dimberman commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826114963


##
uv.lock:
##


Review Comment:
   Oh weird, I explicitly deleted this and commited, I'll add that to my global 
gitignore



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


dimberman commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826114221


##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: 
list[TaskInstance]) -> list[str]:
 for ti in tis:
 readable_tis.append(repr(ti))
 task_instance_key = ti.key
-self.fail(task_instance_key, None)

Review Comment:
   @jedcunningham can you explain a bit further? Is there a workaround we can 
consider? The reason we did this is because the idea of failing the task and 
then setting it to scheduled after felt fraught with potential bugs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


jedcunningham commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826054815


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1817,17 +1828,59 @@ def _fail_tasks_stuck_in_queued(self, session: Session 
= NEW_SESSION) -> None:
 )
 session.add(
 Log(
-event="stuck in queued",
+event=RETRY_STUCK_IN_QUEUED_EVENT,
 task_instance=ti.key,
 extra=(
-"Task will be marked as failed. If the 
task instance has "
+f"Task was stuck in queued and will be 
retried, once it has hit {num_allowed_retries} attempts"

Review Comment:
   ```suggestion
   f"Task was stuck in queued and will be 
requeued, once it has hit {num_allowed_retries} attempts"
   ```
   
   It might be good to avoid "retry" here to avoid confusion between 
dag-author-configured-retries and these retries.



##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1805,6 +1815,7 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
 )
 ).all()
 
+num_allowed_retries = conf.getint("core", "num_stuck_retries", 
fallback=2)

Review Comment:
   ```suggestion
   num_allowed_retries = conf.getint("core", "num_stuck_retries")
   ```
   
   You need to add this to 
[config.yml](https://github.com/apache/airflow/blob/main/airflow/config_templates/config.yml)
 (and then won't need a fallback).



##
uv.lock:
##


Review Comment:
   I assume you didn't intend to add this :)



##
providers/src/airflow/providers/celery/executors/celery_executor.py:
##
@@ -450,7 +450,6 @@ def cleanup_stuck_queued_tasks(self, tis: 
list[TaskInstance]) -> list[str]:
 for ti in tis:
 readable_tis.append(repr(ti))
 task_instance_key = ti.key
-self.fail(task_instance_key, None)

Review Comment:
   This is problematic - folks could install the provider with this code onto 
2.8.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


dimberman commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826037304


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1822,9 +1822,30 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
 ),
 )
 )

Review Comment:
   Yes I've created separate logs for rescheduling and failing of tasks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-11-01 Thread via GitHub


dimberman commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1826037677


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1822,9 +1822,33 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
 ),
 )
 )
+num_times_stuck = self._get_num_times_stuck(ti, 
session)
+if num_times_stuck < conf.getint("core", 
"num_stuck_retries", fallback=3):
+self._reset_task_instance(ti, session)
+else:
+executor.fail(ti.key)
+
+
 except NotImplementedError:
 self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
 
+@provide_session
+def _get_num_times_stuck(self, ti: TaskInstance, session: Session = 
NEW_SESSION) -> int:
+return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id)
+.where(Log.dag_id == ti.dag_id)
+.where(Log.run_id == ti.run_id)
+.where(Log.map_index == ti.map_index)
+.where(Log.try_number == ti.try_number)
+.where(Log.event == "stuck in queued")

Review Comment:
   Agreed, was on my list to do before taking PR out of draft :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-10-30 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1823520481


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1822,9 +1822,33 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
 ),
 )
 )
+num_times_stuck = self._get_num_times_stuck(ti, 
session)
+if num_times_stuck < conf.getint("core", 
"num_stuck_retries", fallback=3):
+self._reset_task_instance(ti, session)
+else:
+executor.fail(ti.key)
+
+
 except NotImplementedError:
 self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
 
+@provide_session
+def _get_num_times_stuck(self, ti: TaskInstance, session: Session = 
NEW_SESSION) -> int:
+return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id)
+.where(Log.dag_id == ti.dag_id)
+.where(Log.run_id == ti.run_id)
+.where(Log.map_index == ti.map_index)
+.where(Log.try_number == ti.try_number)
+.where(Log.event == "stuck in queued")

Review Comment:
   simpler / cleaner i think to just use the `*args` e.g.
   ```suggestion
   .where(
   Log.dag_id == ti.dag_id,
   Log.run_id == ti.run_id,
   Log.map_index == ti.map_index,
   Log.try_number == ti.try_number,
   Log.event == "stuck in queued",
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-10-30 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1823518651


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1822,9 +1822,33 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
 ),
 )
 )
+num_times_stuck = self._get_num_times_stuck(ti, 
session)
+if num_times_stuck < conf.getint("core", 
"num_stuck_retries", fallback=3):
+self._reset_task_instance(ti, session)
+else:
+executor.fail(ti.key)
+
+
 except NotImplementedError:
 self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
 
+@provide_session
+def _get_num_times_stuck(self, ti: TaskInstance, session: Session = 
NEW_SESSION) -> int:
+return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id)
+.where(Log.dag_id == ti.dag_id)
+.where(Log.run_id == ti.run_id)
+.where(Log.map_index == ti.map_index)
+.where(Log.try_number == ti.try_number)
+.where(Log.event == "stuck in queued")

Review Comment:
   should be constant of some kind to ensure that the events are created / 
looked up with same text



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-10-30 Thread via GitHub


o-nikolas commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1823354279


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1822,9 +1822,30 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
 ),
 )
 )

Review Comment:
   Can you update the wording in the logging and docstring above, given the new 
behaviour.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Allow for retry when tasks are stuck in queued [airflow]

2024-10-30 Thread via GitHub


dstandish commented on code in PR #43520:
URL: https://github.com/apache/airflow/pull/43520#discussion_r1823096131


##
airflow/jobs/scheduler_job_runner.py:
##
@@ -1822,9 +1822,30 @@ def _fail_tasks_stuck_in_queued(self, session: Session = 
NEW_SESSION) -> None:
 ),
 )
 )
+num_times_stuck = self._get_num_times_stuck(ti, 
session)
+if num_times_stuck < conf.getint("core", 
"num_stuck_retries", fallback=3):
+self._reset_task_instance(ti, session)
+
+
 except NotImplementedError:
 self.log.debug("Executor doesn't support cleanup of stuck 
queued tasks. Skipping.")
 
+@provide_session
+def _get_num_times_stuck(self, ti: TaskInstance, session: Session = 
NEW_SESSION) -> int:
+return len(session.scalars(select(Log).where(Log.dag_id == ti.task_id)
+.where(Log.dag_id == ti.dag_id)
+.where(Log.run_id == ti.run_id)
+.where(Log.map_index == ti.map_index)
+.where(Log.try_number == ti.try_number)
+.where(Log.event == "stuck in queued")
+))
+
+def _reset_task_instance(self, ti: TaskInstance, session: Session = 
NEW_SESSION):
+ti.external_executor_id = None
+ti.state = State.SCHEDULED
+session.add(ti)

Review Comment:
   Merge not add



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org