Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi merged PR #51638: URL: https://github.com/apache/airflow/pull/51638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on PR #51638: URL: https://github.com/apache/airflow/pull/51638#issuecomment-3047147471 @o-nikolas pointed out an opportunity to make it a little more efficient, so I took that. Final revision (I hope) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2180494013
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -179,6 +176,27 @@ class DeadlineReference:
deadline.evaluate_with()
"""
+class TYPES:
+"""Collection of DeadlineReference types for type checking."""
+
+# Deadlines that should be created when the DagRun is created.
+DAGRUN_CREATED = tuple(
+{
+ReferenceModels.DagRunLogicalDateDeadline,
+ReferenceModels.FixedDatetimeDeadline,
+}
+)
Review Comment:
tuple(set()) enforces unique entries, I did it that was as a safety feature.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ramitkataria commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2167830001 ## airflow-core/tests/unit/models/test_deadline.py: ## @@ -91,7 +91,8 @@ def test_add_deadline(self, dagrun, session): dagrun_id=dagrun.id, ) -Deadline.add_deadline(deadline_orm) +session.add(deadline_orm) +session.flush() Review Comment: Is it because there's other tests (`test_dagrun_queued_at_deadline`, `test_dagrun_success_deadline` etc) that would test the writing of deadlines to the database? If so, I think we should still test that the values were saved correctly and match the expected result -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2162860951 ## airflow-core/src/airflow/models/deadline.py: ## @@ -143,7 +137,7 @@ class BaseDeadlineReference(LoggingMixin, ABC): def reference_name(cls: Any) -> str: return cls.__name__ -def evaluate_with(self, **kwargs: Any) -> datetime: +def evaluate_with(self, session: Session, interval: timedelta, **kwargs: Any) -> datetime: Review Comment: ```suggestion def evaluate_with(self, *, session: Session, interval: timedelta, **kwargs: Any) -> datetime: ``` This applies to all `evaluate_with` and `_evaluate_with`. Make these all keyword-only arguments since they are only used as such, and this avoids potential future headaches. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2162863155
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -179,6 +176,27 @@ class DeadlineReference:
deadline.evaluate_with()
"""
+class TYPES:
+"""Collection of DeadlineReference types for type checking."""
+
+# Deadlines that should be created when the DagRun is created.
+DAGRUN_CREATED = tuple(
+{
+ReferenceModels.DagRunLogicalDateDeadline,
+ReferenceModels.FixedDatetimeDeadline,
+}
+)
Review Comment:
Why not just
```suggestion
DAGRUN_CREATED = (
ReferenceModels.DagRunLogicalDateDeadline,
ReferenceModels.FixedDatetimeDeadline,
)
```
Same below
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2162861960 ## airflow-core/tests/unit/models/test_deadline.py: ## @@ -91,7 +91,8 @@ def test_add_deadline(self, dagrun, session): dagrun_id=dagrun.id, ) -Deadline.add_deadline(deadline_orm) +session.add(deadline_orm) +session.flush() Review Comment: This test is probably not needed anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ramitkataria commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2162707409 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -1769,6 +1770,25 @@ def _update_state(dag: DAG, dag_run: DagRun): _update_state(dag, dag_run) dag_run.notify_dagrun_state_changed() +if (deadline := dag.deadline) and isinstance( +deadline.reference, ReferenceModels.DagRunQueuedAtDeadline +): Review Comment: I agree, I've removed it for now and if it ends up being repetitive, we could add a function to get deadline by reference type -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2160800660 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -1769,6 +1770,25 @@ def _update_state(dag: DAG, dag_run: DagRun): _update_state(dag, dag_run) dag_run.notify_dagrun_state_changed() +if (deadline := dag.deadline) and isinstance( +deadline.reference, ReferenceModels.DagRunQueuedAtDeadline +): Review Comment: Would it be a good idea to add a `get_dagrun_queued_at_deadline` on DAG like the creation deadline? Or remove the function for the creation deadline? It seems weird one has a function and one does not. If this feels repetitive, maybe the function should be changed to like `get_deadline(kind: type[T]) -> T` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2160800660 ## airflow-core/src/airflow/jobs/scheduler_job_runner.py: ## @@ -1769,6 +1770,25 @@ def _update_state(dag: DAG, dag_run: DagRun): _update_state(dag, dag_run) dag_run.notify_dagrun_state_changed() +if (deadline := dag.deadline) and isinstance( +deadline.reference, ReferenceModels.DagRunQueuedAtDeadline +): Review Comment: Would it be a good idea to add a `get_dagrun_queued_at_deadline` on DAG like the creation deadline? Or remove the function for the creation deadline? It seems weird one has a function and one does not. If this feels repetitive, maybe the function should be changed to like `get_deadline(kind: ReferenceModels) -> Deadline | None` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2159362500 ## airflow-core/src/airflow/models/dag.py: ## @@ -1613,6 +1623,22 @@ def create_dagrun( session=session, ) +if dag_deadline := self.get_dagrun_creation_deadlines(): +Deadline.add_deadline( Review Comment: I think I got it sorted. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2159363068
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
I think I got it sorted, ready for a re-review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2155451324 ## airflow-core/src/airflow/models/dag.py: ## @@ -463,6 +464,15 @@ def validate_executor_field(self): "update the executor configuration for this task." ) +def get_dagrun_creation_deadlines(self) -> DeadlineAlert | None: +"""If the DAG has a deadline related to DagRun, return it; else return None.""" +if ( +not (deadline := self.deadline) +or type(deadline.reference) not in DeadlineReference.TYPES.DAGRUN_CREATED Review Comment: Nevermind, I'll just make the change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2153251765 ## airflow-core/src/airflow/models/dag.py: ## @@ -1613,6 +1623,22 @@ def create_dagrun( session=session, ) +if dag_deadline := self.get_dagrun_creation_deadlines(): +Deadline.add_deadline( Review Comment: Doing this means passing session into `_set_dagrun_queued_deadline` which loops back to the issue I have above (https://github.com/apache/airflow/pull/51638#discussion_r2143136539) where that starts messing with other tests... :thinking: hmm. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2152902316 ## airflow-core/src/airflow/models/dag.py: ## @@ -463,6 +464,15 @@ def validate_executor_field(self): "update the executor configuration for this task." ) +def get_dagrun_creation_deadlines(self) -> DeadlineAlert | None: +"""If the DAG has a deadline related to DagRun, return it; else return None.""" +if ( +not (deadline := self.deadline) +or type(deadline.reference) not in DeadlineReference.TYPES.DAGRUN_CREATED Review Comment: Oh, I was actually looking at that last night. I'm going to switch that `if` around, it's a bit cleaner as: ``` if (deadline := self.deadline) and type(deadline.reference) in DeadlineReference.TYPES.DAGRUN_CREATED: return deadline return None ``` For `type(foo) in bar` vs `isinstance(foo, tuple(bar))`, I kinda like the easier readability of the first but don't feel strongly. I'll cast the TYPES to a tuple in their definition and switch to a more simplified version of your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2152762078 ## airflow-core/src/airflow/models/dag.py: ## @@ -1613,6 +1623,22 @@ def create_dagrun( session=session, ) +if dag_deadline := self.get_dagrun_creation_deadlines(): +Deadline.add_deadline( Review Comment: Cool, sounds good. I'll make the change then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2151430603 ## airflow-core/src/airflow/models/dag.py: ## @@ -1613,6 +1623,22 @@ def create_dagrun( session=session, ) +if dag_deadline := self.get_dagrun_creation_deadlines(): +Deadline.add_deadline( Review Comment: I prefer just doing `session.add` too. We do that everywhere else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2151429192 ## airflow-core/src/airflow/models/dag.py: ## @@ -463,6 +464,15 @@ def validate_executor_field(self): "update the executor configuration for this task." ) +def get_dagrun_creation_deadlines(self) -> DeadlineAlert | None: +"""If the DAG has a deadline related to DagRun, return it; else return None.""" +if ( +not (deadline := self.deadline) +or type(deadline.reference) not in DeadlineReference.TYPES.DAGRUN_CREATED Review Comment: Is this better? ```suggestion or not isinstance(deadline.reference, tuple(DeadlineReference.TYPES.DAGRUN_CREATED)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2150427968 ## airflow-core/src/airflow/models/dag.py: ## @@ -1613,6 +1623,22 @@ def create_dagrun( session=session, ) +if dag_deadline := self.get_dagrun_creation_deadlines(): +Deadline.add_deadline( Review Comment: Yeah, I can see that I guess. It felt more "intuitive" this way and allowed for future expansion if we decided we wanted logging or who knows what later but maybe I was just overthinking/overengineering it. How strongly do you feel about this, is it a blocking change or can we leave it to see if anyone else wants to weigh in? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2150368039 ## airflow-core/src/airflow/models/dagrun.py: ## @@ -67,9 +67,10 @@ from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException, TaskNotFound from airflow.listeners.listener import get_listener_manager -from airflow.models import Log +from airflow.models import Deadline, Log Review Comment: I let the IDE do it's thing and didn't catch that, thanks. I'll fix it. Edit: Strange, there's even already a `from airflow.models.deadline import ReferenceModels`, I wonder why it did it this way. Either way, thanks for catching it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2150368039 ## airflow-core/src/airflow/models/dagrun.py: ## @@ -67,9 +67,10 @@ from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException, TaskNotFound from airflow.listeners.listener import get_listener_manager -from airflow.models import Log +from airflow.models import Deadline, Log Review Comment: I let the IDE do it's thing and didn't catch that, thanks. I'll fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
pierrejeambrun commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2150320680
##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##
@@ -421,6 +421,21 @@ def trigger_dag_run(
state=DagRunState.QUEUED,
session=session,
)
+
+if dag_deadline := dag.get_dagrun_deadline():
+Deadline.add_deadline(
+Deadline(
+deadline=dag_deadline.reference.evaluate_with(
+interval=dag_deadline.interval,
+dag_id=dag.dag_id,
+),
+callback=dag_deadline.callback,
+callback_kwargs=dag_deadline.callback_kwargs or {},
+dag_id=dag.dag_id,
+dagrun_id=dag_run.run_id,
+)
+)
+
Review Comment:
> Maybe this should be in models/dag/create_dagrun if they all funnel
through there?
Yes that could be it indeed. We'd need to verify that they all go through
there.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ashb commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2147474950 ## airflow-core/src/airflow/models/dagrun.py: ## @@ -67,9 +67,10 @@ from airflow.configuration import conf as airflow_conf from airflow.exceptions import AirflowException, TaskNotFound from airflow.listeners.listener import get_listener_manager -from airflow.models import Log +from airflow.models import Deadline, Log Review Comment: We shouldn't be adding new "convince" imports to airflow.models (as those were mostly there for users/dag authors in the 2.x era) but import directly from the sub module ## airflow-core/src/airflow/models/dag.py: ## @@ -1613,6 +1623,22 @@ def create_dagrun( session=session, ) +if dag_deadline := self.get_dagrun_creation_deadlines(): +Deadline.add_deadline( Review Comment: Given all those method does is session.add I would rather we did that directly here. Airflow devs know what session.add does via having an extra method in the way introduces extra cognitive load of "what does this actually do?" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2145529882
##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##
@@ -421,6 +421,21 @@ def trigger_dag_run(
state=DagRunState.QUEUED,
session=session,
)
+
+if dag_deadline := dag.get_dagrun_deadline():
+Deadline.add_deadline(
+Deadline(
+deadline=dag_deadline.reference.evaluate_with(
+interval=dag_deadline.interval,
+dag_id=dag.dag_id,
+),
+callback=dag_deadline.callback,
+callback_kwargs=dag_deadline.callback_kwargs or {},
+dag_id=dag.dag_id,
+dagrun_id=dag_run.run_id,
+)
+)
+
Review Comment:
I think you are right, I was conflating API and SDK and thought all dagrun
creations fed through here. Maybe this should be in models/dag/create_dagrun
if they all funnel through there?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
pierrejeambrun commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2144951588
##
airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py:
##
@@ -421,6 +421,21 @@ def trigger_dag_run(
state=DagRunState.QUEUED,
session=session,
)
+
+if dag_deadline := dag.get_dagrun_deadline():
+Deadline.add_deadline(
+Deadline(
+deadline=dag_deadline.reference.evaluate_with(
+interval=dag_deadline.interval,
+dag_id=dag.dag_id,
+),
+callback=dag_deadline.callback,
+callback_kwargs=dag_deadline.callback_kwargs or {},
+dag_id=dag.dag_id,
+dagrun_id=dag_run.run_id,
+)
+)
+
Review Comment:
There are other places where we 'create' a dagrun, should all those places
also handle the associated deadline creation, or is that specific to this
endpoint?
(We can name `trigger_dag` that is used in the command line or execution
api, some other places calling `dag.create_dagrun` which will not create the
deadline like we are doing here)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2143136539
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
Sure, sorry about that.If I change the signature to
```
@provide_session
def _set_dagrun_queued_deadline(self, session: Session = NEW_SESSION):
```
Then three tests start to fail:
```
[2025-06-12T08:57:29.667-0700] {dagbag.py:585} INFO - Filling up the DagBag
from /dev/null
FAILED [ 2%][2025-06-12T08:57:29.672-0700] {collection.py:82} INFO -
Creating ORM DAG for test_clear_task_instances_for_backfill_finished_dagrun
airflow-core/tests/unit/models/test_dagrun.py:159
(TestDagRun.test_clear_task_instances_for_backfill_finished_dagrun[success])
0 != 1
Expected :1
Actual :0
unit/models/test_dagrun.py:173: in
test_clear_task_instances_for_backfill_finished_dagrun
assert dr0.clear_number == 1
E assert 0 == 1
E+ where 0 = .clear_number
[2025-06-12T08:57:29.827-0700] {dagbag.py:585} INFO - Filling up the DagBag
from /dev/null
FAILED [ 3%][2025-06-12T08:57:29.833-0700] {collection.py:82} INFO -
Creating ORM DAG for test_clear_task_instances_for_backfill_finished_dagrun
airflow-core/tests/unit/models/test_dagrun.py:159
(TestDagRun.test_clear_task_instances_for_backfill_finished_dagrun[failed])
0 != 1
Expected :1
Actual :0
unit/models/test_dagrun.py:173: in
test_clear_task_instances_for_backfill_finished_dagrun
assert dr0.clear_number == 1
E assert 0 == 1
E+ where 0 = .clear_number
```
and
```
FAILED [ 40%]
airflow-core/tests/unit/models/test_dagrun.py:1029
(TestDagRun.test_next_dagruns_to_examine_only_unpaused[queued])
[] != []
Expected :[]
Actual :[]
unit/models/test_dagrun.py:1077: in
test_next_dagruns_to_examine_only_unpaused
assert runs == []
E assert equals failed
E -[ +[]
E - ,
E -]
PASSED [ 41%]
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141796058 ## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -421,6 +421,18 @@ def trigger_dag_run( state=DagRunState.QUEUED, session=session, ) + +if dag.deadline and dag.has_dagrun_deadline(): Review Comment: That also fails with the error that deadline is assigned but never used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141932321
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -155,7 +155,7 @@ def evaluate_with(self, **kwargs: Any) -> datetime:
if extra_kwargs := kwargs.keys() - filtered_kwargs.keys():
self.log.debug("Ignoring unexpected parameters: %s", ",
".join(extra_kwargs))
-return self._evaluate_with(**filtered_kwargs)
+return self._evaluate_with(**filtered_kwargs) + interval
Review Comment:
Why does this need to change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141929161
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
Why do we need to flush here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ephraimbuddy commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2142110670
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
Can you paste the error? It's not clear and not opening for me when clicked
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141813662 ## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -421,6 +421,18 @@ def trigger_dag_run( state=DagRunState.QUEUED, session=session, ) + +if dag.deadline and dag.has_dagrun_deadline(): Review Comment: oh, are you talking about replacing the boolen `has_dagrun_deadline` with `get_dagrun_deadline` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141629390 ## task-sdk/src/airflow/sdk/definitions/deadline.py: ## @@ -179,6 +176,18 @@ class DeadlineReference: deadline.evaluate_with() """ +class TYPES: +"""Collection of DeadlineReference types for type checking.""" + +from airflow.models.deadline import ReferenceModels Review Comment: I don’t think this import is needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141316967 ## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -421,6 +421,18 @@ def trigger_dag_run( state=DagRunState.QUEUED, session=session, ) + +if dag.deadline and dag.has_dagrun_deadline(): Review Comment: This is a little redundant, but mypy required the `if dag.deadline` check to be separate like that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141995172
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
The issue I ran into with that was if I provide a session to
`_set_dagrun_queued_deadline`, even if I don't use it for anything, just add it
to the signature, it breaks three of the existing tests and I'm not sure why.
Do you perhaps see the cause there? Either way, it's 1AM and I need sleep,
I'll poke it more "in the morning".

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141937666
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -155,7 +155,7 @@ def evaluate_with(self, **kwargs: Any) -> datetime:
if extra_kwargs := kwargs.keys() - filtered_kwargs.keys():
self.log.debug("Ignoring unexpected parameters: %s", ",
".join(extra_kwargs))
-return self._evaluate_with(**filtered_kwargs)
+return self._evaluate_with(**filtered_kwargs) + interval
Review Comment:
That is what it was supposed to be returning. The `deadline` table should
be storing the calculated deadline time, which is the reference plus the
interval
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141931684
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
It’s likely better to remove `provide_session` (and always pass in a session
object), and flush the session before calling this function instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141929607
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -155,7 +155,7 @@ def evaluate_with(self, **kwargs: Any) -> datetime:
if extra_kwargs := kwargs.keys() - filtered_kwargs.keys():
self.log.debug("Ignoring unexpected parameters: %s", ",
".join(extra_kwargs))
-return self._evaluate_with(**filtered_kwargs)
+return self._evaluate_with(**filtered_kwargs) + interval
Review Comment:
Why does this need to change?
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
Why do we need to flush here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141929607
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -155,7 +155,7 @@ def evaluate_with(self, **kwargs: Any) -> datetime:
if extra_kwargs := kwargs.keys() - filtered_kwargs.keys():
self.log.debug("Ignoring unexpected parameters: %s", ",
".join(extra_kwargs))
-return self._evaluate_with(**filtered_kwargs)
+return self._evaluate_with(**filtered_kwargs) + interval
Review Comment:
Why does this need to change?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638:
URL: https://github.com/apache/airflow/pull/51638#discussion_r2141928242
##
airflow-core/src/airflow/models/deadline.py:
##
@@ -220,9 +220,12 @@ class DagRunQueuedAtDeadline(BaseDeadlineReference):
required_kwargs = {"dag_id"}
-def _evaluate_with(self, **kwargs: Any) -> datetime:
+@provide_session
+def _evaluate_with(self, session=NEW_SESSION, **kwargs: Any) ->
datetime:
from airflow.models import DagRun
+session.flush()
Review Comment:
This could be the wrong place for this flush. Unit tests had a race
condition where the queued_at value was not yet written to the db before this
tried to query for it, so I added this to ensure that it is always flushed
before checking?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141741606 ## task-sdk/src/airflow/sdk/definitions/deadline.py: ## @@ -179,6 +176,18 @@ class DeadlineReference: deadline.evaluate_with() """ +class TYPES: +"""Collection of DeadlineReference types for type checking.""" + +from airflow.models.deadline import ReferenceModels Review Comment: You are correct, I initially had it here so it was only imported for the type_checking part, but I ended up needing to add that at the module level anyway for the deserialization. I'll remove it down here. Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
uranusjr commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141627395 ## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -421,6 +421,18 @@ def trigger_dag_run( state=DagRunState.QUEUED, session=session, ) + +if dag.deadline and dag.has_dagrun_deadline(): Review Comment: I usually write this sort of things like this ```python def get_dagrun_deadline(self) -> Deadline: ... if deadline := dag.get_dagrun_deadline(): ... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
boring-cyborg[bot] commented on PR #51638: URL: https://github.com/apache/airflow/pull/51638#issuecomment-2964646225 Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points: - Pay attention to the quality of your code (ruff, mypy and type annotations). Our [pre-commits]( https://github.com/apache/airflow/blob/main/contributing-docs/08_static_code_checks.rst#prerequisites-for-pre-commit-hooks) will help you with that. - In case of a new feature add useful documentation (in docstrings or in `docs/` directory). Adding a new operator? Check this short [guide](https://github.com/apache/airflow/blob/main/airflow-core/docs/howto/custom-operator.rst) Consider adding an example DAG that shows how users should use it. - Consider using [Breeze environment](https://github.com/apache/airflow/blob/main/dev/breeze/doc/README.rst) for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations. - Be patient and persistent. It might take some time to get a review or get the final approval from Committers. - Please follow [ASF Code of Conduct](https://www.apache.org/foundation/policies/conduct) for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack. - Be sure to read the [Airflow Coding style]( https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#coding-style-and-best-practices). - Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141796058 ## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -421,6 +421,18 @@ def trigger_dag_run( state=DagRunState.QUEUED, session=session, ) + +if dag.deadline and dag.has_dagrun_deadline(): Review Comment: That also fails with the error that deadline is assigned but never used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] AIP-86 - Calculate and store the dagrun Deadline [airflow]
ferruzzi commented on code in PR #51638: URL: https://github.com/apache/airflow/pull/51638#discussion_r2141764844 ## airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py: ## @@ -421,6 +421,18 @@ def trigger_dag_run( state=DagRunState.QUEUED, session=session, ) + +if dag.deadline and dag.has_dagrun_deadline(): Review Comment: interesting, I wonder why mypy would allow that but not `if dag.get_dagrun_deadline():`?I'll give it a try, thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
