Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2881941183


##
airflow-core/src/airflow/models/deadline_alert.py:
##
@@ -86,6 +86,8 @@ def matches_definition(self, other: DeadlineAlert) -> bool:
 @property
 def reference_class(self) -> 
type[SerializedReferenceModels.SerializedBaseDeadlineReference]:
 """Return the deserialized reference class."""
+if "__class_path" in self.reference:
+return SerializedReferenceModels.SerializedCustomReference

Review Comment:
   Maybe—if I understand you correctly, not sure if I do



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877959964


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
 def deserialize_reference(cls, reference_data: dict):
 return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for 
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)

Review Comment:
   Handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/61461/commits/0d6ff563d1e57550b4fb3766a8834004c835fef7)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877999592


##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
 from airflow.sdk.serde import serialize
 
 return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
 "interval": d.interval.total_seconds(),
 "callback": serialize(d.callback),
 }
 
 
+_BUILTIN_DEADLINE_MODULES = (

Review Comment:
   Yes, we should be maintaining backcompat here and I have tried to do that as 
per above comment as well: [add support for older 
refs](https://github.com/apache/airflow/pull/61461/commits/32455aca2b288ca2769868aa101abbaae3937f77)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877999592


##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
 from airflow.sdk.serde import serialize
 
 return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
 "interval": d.interval.total_seconds(),
 "callback": serialize(d.callback),
 }
 
 
+_BUILTIN_DEADLINE_MODULES = (

Review Comment:
   Yes, we should be maintaining backcompat here and I have tried to do that as 
per above comment as well: 32455aca2b



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877967949


##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
 from airflow.sdk.serde import serialize
 
 return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
 "interval": d.interval.total_seconds(),
 "callback": serialize(d.callback),
 }
 
 
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
 def encode_deadline_reference(ref) -> dict[str, Any]:
 """
 Encode a deadline reference.
 
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
 :meta private:
 """
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import 
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at 
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:

Review Comment:
   Thanks, it is a real concern. I will handle it by adding 
`airflow.models.deadline` into builtins for now, feels easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877959539


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
 def deserialize_reference(cls, reference_data: dict):
 return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for 
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)

Review Comment:
   I handled it as suggested, thanks for the thorough review



##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
 def deserialize_reference(cls, reference_data: dict):
 return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for 
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)

Review Comment:
   Handled in 0d6ff563d1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


kaxil commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877246051


##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
 from airflow.sdk.serde import serialize
 
 return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
 "interval": d.interval.total_seconds(),
 "callback": serialize(d.callback),
 }
 
 
+_BUILTIN_DEADLINE_MODULES = (

Review Comment:
   Open question: is it intentional that legacy/core `ReferenceModels.*` 
references are now outside the builtin set? If backwards compatibility is 
desired, should this tuple include `"airflow.models.deadline"` as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-03 Thread via GitHub


kaxil commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877244580


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
 def deserialize_reference(cls, reference_data: dict):
 return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for 
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)

Review Comment:
   I think this bypasses the base `evaluate_with()` validation/filtering path. 
`SerializedBaseDeadlineReference.evaluate_with()` validates `required_kwargs` 
and drops extras before calling `_evaluate_with()`, but here we call 
`inner_ref._evaluate_with()` directly with all kwargs. That can break custom 
refs with strict signatures (or missing keys) at runtime. Could we mirror the 
base filtering/validation logic here and then delegate with filtered kwargs?



##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
 from airflow.sdk.serde import serialize
 
 return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
 "interval": d.interval.total_seconds(),
 "callback": serialize(d.callback),
 }
 
 
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
 def encode_deadline_reference(ref) -> dict[str, Any]:
 """
 Encode a deadline reference.
 
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
 :meta private:
 """
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import 
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at 
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:

Review Comment:
   Potential compatibility risk here: refs from `airflow.models.deadline` are 
no longer treated as builtins, so they now rely on `__class_path` import during 
decode. If that path resolves to nested classes (e.g. under `ReferenceModels`), 
`import_string()` currently supports only top-level attributes, which can fail 
deserialization. Should we either include `airflow.models.deadline` in builtins 
or make import resolution nested-class aware?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-02 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2871162165


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -259,7 +310,9 @@ class TYPES:
 )
 SerializedReferenceModels.TYPES.DAGRUN_QUEUED = 
(SerializedReferenceModels.DagRunQueuedAtDeadline,)
 SerializedReferenceModels.TYPES.DAGRUN = (
-SerializedReferenceModels.TYPES.DAGRUN_CREATED + 
SerializedReferenceModels.TYPES.DAGRUN_QUEUED
+SerializedReferenceModels.TYPES.DAGRUN_CREATED
++ SerializedReferenceModels.TYPES.DAGRUN_QUEUED
++ (SerializedReferenceModels.SerializedCustomReference,)
 )

Review Comment:
   Handled in [comments from 
tp](https://github.com/apache/airflow/pull/61461/commits/4394c58781d57bdf99d13ad3e18d5a5a404f9c34)



##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,52 @@ def serialize_reference(self) -> dict:
 def deserialize_reference(cls, reference_data: dict):
 return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""Wrapper for custom deadline references."""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+filtered_kwargs = {k: v for k, v in kwargs.items() if k in 
self.required_kwargs}

Review Comment:
   Good catch, handled in [pass to inner_ref through 
kwargs](https://github.com/apache/airflow/pull/61461/commits/c65ae8a147bed7f962b622f78d51cef937ebef60)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-02 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2871152953


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -259,7 +310,9 @@ class TYPES:
 )
 SerializedReferenceModels.TYPES.DAGRUN_QUEUED = 
(SerializedReferenceModels.DagRunQueuedAtDeadline,)
 SerializedReferenceModels.TYPES.DAGRUN = (
-SerializedReferenceModels.TYPES.DAGRUN_CREATED + 
SerializedReferenceModels.TYPES.DAGRUN_QUEUED
+SerializedReferenceModels.TYPES.DAGRUN_CREATED
++ SerializedReferenceModels.TYPES.DAGRUN_QUEUED
++ (SerializedReferenceModels.SerializedCustomReference,)
 )

Review Comment:
   All good! It does the trick
   
   ```python
   mine = (
   *SerializedReferenceModels.TYPES.DAGRUN_CREATED,
   *SerializedReferenceModels.TYPES.DAGRUN_QUEUED,
   SerializedReferenceModels.SerializedCustomReference,
   )
   mine
   Out[7]: 
   
(airflow.serialization.definitions.deadline.SerializedReferenceModels.DagRunLogicalDateDeadline,

airflow.serialization.definitions.deadline.SerializedReferenceModels.FixedDatetimeDeadline,

airflow.serialization.definitions.deadline.SerializedReferenceModels.AverageRuntimeDeadline,

airflow.serialization.definitions.deadline.SerializedReferenceModels.DagRunQueuedAtDeadline,

airflow.serialization.definitions.deadline.SerializedReferenceModels.SerializedCustomReference)
   mine == SerializedReferenceModels.TYPES.DAGRUN
   Out[8]: True
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-02 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2871140716


##
airflow-core/src/airflow/models/deadline_alert.py:
##
@@ -86,6 +86,8 @@ def matches_definition(self, other: DeadlineAlert) -> bool:
 @property
 def reference_class(self) -> 
type[SerializedReferenceModels.SerializedBaseDeadlineReference]:
 """Return the deserialized reference class."""
+if "__class_path" in self.reference:
+return SerializedReferenceModels.SerializedCustomReference

Review Comment:
   Hmm, what do you instead think of matching the reference type _with all 
other_ builtin types and if none matches, return a custom reference?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-01 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870768056


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -259,7 +310,9 @@ class TYPES:
 )
 SerializedReferenceModels.TYPES.DAGRUN_QUEUED = 
(SerializedReferenceModels.DagRunQueuedAtDeadline,)
 SerializedReferenceModels.TYPES.DAGRUN = (
-SerializedReferenceModels.TYPES.DAGRUN_CREATED + 
SerializedReferenceModels.TYPES.DAGRUN_QUEUED
+SerializedReferenceModels.TYPES.DAGRUN_CREATED
++ SerializedReferenceModels.TYPES.DAGRUN_QUEUED
++ (SerializedReferenceModels.SerializedCustomReference,)
 )

Review Comment:
   Not sure if this would work
   
   ```python
   SerializedReferenceModels.TYPES.DAGRUN = (
   *SerializedReferenceModels.TYPES.DAGRUN_CREATED,
   *SerializedReferenceModels.TYPES.DAGRUN_QUEUED,
   SerializedReferenceModels.SerializedCustomReference,
   )
   ```
   
   Slightly more readable to me if it does.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-01 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870765936


##
airflow-core/src/airflow/models/deadline_alert.py:
##
@@ -86,6 +86,8 @@ def matches_definition(self, other: DeadlineAlert) -> bool:
 @property
 def reference_class(self) -> 
type[SerializedReferenceModels.SerializedBaseDeadlineReference]:
 """Return the deserialized reference class."""
+if "__class_path" in self.reference:
+return SerializedReferenceModels.SerializedCustomReference

Review Comment:
   I wonder if there’s a better way to do this (Not saying this is wrong, just 
the criteria is a bit random and maybe too easy to break)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-01 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870619189


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,52 @@ def serialize_reference(self) -> dict:
 def deserialize_reference(cls, reference_data: dict):
 return cls(max_runs=reference_data["max_runs"], 
min_runs=reference_data.get("min_runs"))
 
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""Wrapper for custom deadline references."""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta, 
**kwargs: Any) -> datetime | None:
+filtered_kwargs = {k: v for k, v in kwargs.items() if k in 
self.required_kwargs}

Review Comment:
   Good catch, handled in [comments from 
kaxil](https://github.com/apache/airflow/pull/61461/commits/f78d6235c6400934175912142e29eafaf59421a9)



##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -118,69 +223,64 @@ class TYPES:
 
 # Deadlines that should be created when the DagRun is created.
 DAGRUN_CREATED: DeadlineReferenceTypes = (
-ReferenceModels.DagRunLogicalDateDeadline,
-ReferenceModels.FixedDatetimeDeadline,
-ReferenceModels.AverageRuntimeDeadline,
+DagRunLogicalDateDeadline,
+FixedDatetimeDeadline,
+AverageRuntimeDeadline,
 )
 
 # Deadlines that should be created when the DagRun is queued.
-DAGRUN_QUEUED: DeadlineReferenceTypes = 
(ReferenceModels.DagRunQueuedAtDeadline,)
+DAGRUN_QUEUED: DeadlineReferenceTypes = (DagRunQueuedAtDeadline,)
 
 # All DagRun-related deadline types.
 DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
 
-from airflow.models.deadline import ReferenceModels
-
-DAGRUN_LOGICAL_DATE: DeadlineReferenceType = 
ReferenceModels.DagRunLogicalDateDeadline()
-DAGRUN_QUEUED_AT: DeadlineReferenceType = 
ReferenceModels.DagRunQueuedAtDeadline()
+DAGRUN_LOGICAL_DATE: DeadlineReferenceType = DagRunLogicalDateDeadline()
+DAGRUN_QUEUED_AT: DeadlineReferenceType = DagRunQueuedAtDeadline()
 
 @classmethod
 def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None) 
-> DeadlineReferenceType:
 if max_runs == 0:
-max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
+max_runs = AverageRuntimeDeadline.DEFAULT_LIMIT
 if min_runs is None:
 min_runs = max_runs
-return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
+return AverageRuntimeDeadline(max_runs, min_runs)
 
 @classmethod
-def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
-return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
+def FIXED_DATETIME(cls, dt: datetime) -> DeadlineReferenceType:
+return FixedDatetimeDeadline(dt)
 
 # TODO: Remove this once other deadline types exist.
 #   This is a temporary reference type used only in tests to verify that
 #   dag.has_dagrun_deadline() returns false if the dag has a non-dagrun 
deadline type.
 #   It should be replaced with a real non-dagrun deadline type when one is 
available.
 _TEMPORARY_TEST_REFERENCE = type(
 "TemporaryTestDeadlineForTypeChecking",
-(DeadlineReferenceType,),
-{"_evaluate_with": lambda self, **kwargs: datetime.now()},
+(BaseDeadlineReference,),
+{"serialize_reference": lambda self: {REFERENCE_TYPE_FIELD: 
"TemporaryTestDeadlineForTypeChecking"}},
 )()
 
 @classmethod
 def register_custom_reference(
 cls,
-reference_class: type[ReferenceModels.BaseDeadlineReference],
+reference_class: type[BaseDeadlineReference],
 deadline_reference_type: DeadlineReferenceTypes | None = None,
-) -> type[ReferenceModels.BaseDeadlineReference]:
+) -> type[BaseDeadlineReference]:
 """
 Register a custom deadline reference class.
 
 :param reference_class: The custom reference class inheriting from 
BaseDeadlineReference
 :param deadline_reference_type: A DeadlineReference.TYPES for when the 
deadline should be evaluated ("DAGRUN_CREATED",
 "DAGRUN_QUEUED", etc.); defaults to 
DeadlineReference.TYPES.DAGRUN_CREATED
 """
-from airflow.models.deadline import ReferenceModels
-
 # Default to DAGRUN_CREATED if no deadline_reference_type specified
 if deadline_reference_type is None:
 deadline_reference_type = cls.TYPES.DAGRUN_CREATED
 
 # Validate the reference class inherits from BaseDeadlineReference
-if not issubclass(reference_class, 
ReferenceModels.BaseDeadli

Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-03-01 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870589608


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -118,69 +223,64 @@ class TYPES:
 
 # Deadlines that should be created when the DagRun is created.
 DAGRUN_CREATED: DeadlineReferenceTypes = (
-ReferenceModels.DagRunLogicalDateDeadline,
-ReferenceModels.FixedDatetimeDeadline,
-ReferenceModels.AverageRuntimeDeadline,
+DagRunLogicalDateDeadline,
+FixedDatetimeDeadline,
+AverageRuntimeDeadline,
 )
 
 # Deadlines that should be created when the DagRun is queued.
-DAGRUN_QUEUED: DeadlineReferenceTypes = 
(ReferenceModels.DagRunQueuedAtDeadline,)
+DAGRUN_QUEUED: DeadlineReferenceTypes = (DagRunQueuedAtDeadline,)
 
 # All DagRun-related deadline types.
 DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
 
-from airflow.models.deadline import ReferenceModels
-
-DAGRUN_LOGICAL_DATE: DeadlineReferenceType = 
ReferenceModels.DagRunLogicalDateDeadline()
-DAGRUN_QUEUED_AT: DeadlineReferenceType = 
ReferenceModels.DagRunQueuedAtDeadline()
+DAGRUN_LOGICAL_DATE: DeadlineReferenceType = DagRunLogicalDateDeadline()
+DAGRUN_QUEUED_AT: DeadlineReferenceType = DagRunQueuedAtDeadline()
 
 @classmethod
 def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None) 
-> DeadlineReferenceType:
 if max_runs == 0:
-max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
+max_runs = AverageRuntimeDeadline.DEFAULT_LIMIT
 if min_runs is None:
 min_runs = max_runs
-return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
+return AverageRuntimeDeadline(max_runs, min_runs)
 
 @classmethod
-def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
-return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
+def FIXED_DATETIME(cls, dt: datetime) -> DeadlineReferenceType:
+return FixedDatetimeDeadline(dt)
 
 # TODO: Remove this once other deadline types exist.
 #   This is a temporary reference type used only in tests to verify that
 #   dag.has_dagrun_deadline() returns false if the dag has a non-dagrun 
deadline type.
 #   It should be replaced with a real non-dagrun deadline type when one is 
available.
 _TEMPORARY_TEST_REFERENCE = type(
 "TemporaryTestDeadlineForTypeChecking",
-(DeadlineReferenceType,),
-{"_evaluate_with": lambda self, **kwargs: datetime.now()},
+(BaseDeadlineReference,),
+{"serialize_reference": lambda self: {REFERENCE_TYPE_FIELD: 
"TemporaryTestDeadlineForTypeChecking"}},
 )()
 
 @classmethod
 def register_custom_reference(
 cls,
-reference_class: type[ReferenceModels.BaseDeadlineReference],
+reference_class: type[BaseDeadlineReference],
 deadline_reference_type: DeadlineReferenceTypes | None = None,
-) -> type[ReferenceModels.BaseDeadlineReference]:
+) -> type[BaseDeadlineReference]:
 """
 Register a custom deadline reference class.
 
 :param reference_class: The custom reference class inheriting from 
BaseDeadlineReference
 :param deadline_reference_type: A DeadlineReference.TYPES for when the 
deadline should be evaluated ("DAGRUN_CREATED",
 "DAGRUN_QUEUED", etc.); defaults to 
DeadlineReference.TYPES.DAGRUN_CREATED
 """
-from airflow.models.deadline import ReferenceModels
-
 # Default to DAGRUN_CREATED if no deadline_reference_type specified
 if deadline_reference_type is None:
 deadline_reference_type = cls.TYPES.DAGRUN_CREATED
 
 # Validate the reference class inherits from BaseDeadlineReference
-if not issubclass(reference_class, 
ReferenceModels.BaseDeadlineReference):
+if not issubclass(reference_class, BaseDeadlineReference):

Review Comment:
   Good call! It might break for such users, let me add a compat shim, I was 
under the impression that since this is an experimental feature, we should be 
OK to do it, but maybe thats not a great idea!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-16 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2815482327


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -226,7 +224,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: 
Any) -> datetime | None:
 return None
 
 avg_duration_seconds = sum(durations) / len(durations)
-return timezone.utcnow() + timedelta(seconds=avg_duration_seconds)
+return timezone.utcnow() + 
timedelta(seconds=float(avg_duration_seconds))

Review Comment:
   Now I remember why, I saw this failure:
   ```python
   airflow-core/tests/unit/models/test_deadline.py:480: in 
test_average_runtime_with_min_runs
   result = reference.evaluate_with(session=session, interval=interval, 
dag_id=DAG_ID)
   airflow-core/src/airflow/serialization/definitions/deadline.py:104: in 
evaluate_with
   base_time = self._evaluate_with(session=session, **filtered_kwargs)
   airflow-core/src/airflow/utils/session.py:98: in wrapper
   return func(*args, **kwargs)
   airflow-core/src/airflow/serialization/definitions/deadline.py:229: in 
_evaluate_with
   return timezone.utcnow() + timedelta(seconds=avg_duration_seconds)
   E   TypeError: unsupported type for timedelta seconds component: 
decimal.Decimal
   ```
   
   
   But I am handling it exactly how it is done by models/deadline: 
https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/deadline.py#L417-L429



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-16 Thread via GitHub


amoghrajesh closed pull request #61461: Decouple deadline reference types from 
core in task SDK
URL: https://github.com/apache/airflow/pull/61461


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-16 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2815267654


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -226,7 +224,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: 
Any) -> datetime | None:
 return None
 
 avg_duration_seconds = sum(durations) / len(durations)
-return timezone.utcnow() + timedelta(seconds=avg_duration_seconds)
+return timezone.utcnow() + 
timedelta(seconds=float(avg_duration_seconds))

Review Comment:
   Actually I do not remember why I made that, just reverting it, seems 
unnecessary. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-16 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2815258094


##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -193,19 +193,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
 from airflow.sdk.serde import serialize
 
 return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
 "interval": d.interval.total_seconds(),
 "callback": serialize(d.callback),
 }
 
 
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
 def encode_deadline_reference(ref) -> dict[str, Any]:
 """
 Encode a deadline reference.
 
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
 :meta private:
 """
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import 
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at 
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:
+serialized["__class_path"] = qualname(type(ref))

Review Comment:
   You're right, it does. Handled it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-13 Thread via GitHub


amoghrajesh commented on PR #61461:
URL: https://github.com/apache/airflow/pull/61461#issuecomment-3896826246

   Good question, it lead me to performing some more testing and it revealed 
that deadline alerts with custom refs broke after my last PR: 
https://github.com/apache/airflow/pull/61118.
   
   I reworked this one, made some changes and have pushed it and that's 
reflected in the PR desc. Let me know what you thinlk


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-11 Thread via GitHub


ferruzzi commented on PR #61461:
URL: https://github.com/apache/airflow/pull/61461#issuecomment-3887841391

   I don't understand how this is intended to work.  Can you show me what a dag 
definition looks like now?  Also, what is the process for adding a new 
reference now?  Do we still add the logic in models path but now also and the 
empty class signature in the definitions file as well?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-11 Thread via GitHub


ferruzzi commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2796084465


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -203,29 +303,36 @@ def register_custom_reference(
 
 def deadline_reference(
 deadline_reference_type: DeadlineReferenceTypes | None = None,
-) -> Callable[[type[ReferenceModels.BaseDeadlineReference]], 
type[ReferenceModels.BaseDeadlineReference]]:
+) -> Callable[[type[BaseDeadlineReference]], type[BaseDeadlineReference]]:
 """
 Decorate a class to register a custom deadline reference.
 
 Usage:
 @deadline_reference()
-class MyCustomReference(ReferenceModels.BaseDeadlineReference):
+class MyCustomReference(BaseDeadlineReference):
 # By default, evaluate_with will be called when a new dagrun is 
created.
 def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
-# Put your business logic here
+# Put your business logic here (use deferred imports for Core 
types)
+from airflow.models import DagRun
 return some_datetime
 
+def serialize_reference(self) -> dict:
+return {"reference_type": self.reference_name}
+
 @deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
-class MyQueuedRef(ReferenceModels.BaseDeadlineReference):
+class MyQueuedRef(BaseDeadlineReference):
 # Optionally, you can specify when you want it calculated by 
providing a DeadlineReference.TYPES
 def _evaluate_with(self, *, session: Session, **kwargs) -> 
datetime:
  # Put your business logic here

Review Comment:
   I assume you want to put the same note about imports as you did in the other 
example?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-10 Thread via GitHub


uranusjr commented on PR #61461:
URL: https://github.com/apache/airflow/pull/61461#issuecomment-3882458691

   I think this is good more or less now with a couple of minor suggestions. 
Also would appreciate @ferruzzi if you could provide some perspective if this 
would step on your work too much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-10 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2791547162


##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -193,19 +193,39 @@ def encode_deadline_alert(d: DeadlineAlert | 
SerializedDeadlineAlert) -> dict[st
 from airflow.sdk.serde import serialize
 
 return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
 "interval": d.interval.total_seconds(),
 "callback": serialize(d.callback),
 }
 
 
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
 def encode_deadline_reference(ref) -> dict[str, Any]:
 """
 Encode a deadline reference.
 
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
 :meta private:
 """
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import 
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at 
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:
+serialized["__class_path"] = qualname(type(ref))

Review Comment:
   ```suggestion
   serialized["__class_path"] = qualname(ref)
   ```
   
   IIRC qualname handles this automatically



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-10 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2791545765


##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -226,7 +224,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: 
Any) -> datetime | None:
 return None
 
 avg_duration_seconds = sum(durations) / len(durations)
-return timezone.utcnow() + timedelta(seconds=avg_duration_seconds)
+return timezone.utcnow() + 
timedelta(seconds=float(avg_duration_seconds))

Review Comment:
   Why is this cast needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-10 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2787214241


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for 
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) -> 
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}

Review Comment:
   I just dropped the commit and am removing inheritnace from child classes 
where not needed: fc38cafb2d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-09 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781468006


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for 
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) -> 
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}

Review Comment:
   Just tried to do it in [comments from 
tp](https://github.com/apache/airflow/pull/61461/commits/33fcd264b8bfbff9160b95de88c87a39d8d0c9b9)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-09 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781444229


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for 
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) -> 
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}

Review Comment:
   Interesting perspective about serializer injecting it, we should be able to 
yes since serializer anyways owns that the serialization flow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-09 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781426429


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for 
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) -> 
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}

Review Comment:
   Umm yes you are right, we should be able to safely remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-09 Thread via GitHub


amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781372150


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False

Review Comment:
   Umm maybe what if I can add it in the encoder maybe?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-09 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781304639


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for 
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) -> 
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}

Review Comment:
   I wonder if `{REFERENCE_TYPE_FIELD: self.reference_name}` should be injected 
automatically in the serializer so subclasses don’t need to repeat it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-09 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781300720


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for 
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) -> 
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}

Review Comment:
   This does not seem needed? It’s exactly the same as in the base. (Same for 
DagRunQueuedAtDeadline)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-09 Thread via GitHub


uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781295991


##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
 
 logger = logging.getLogger(__name__)
 
-DeadlineReferenceTypes: TypeAlias = 
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with 
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles 
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's 
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and 
implement
+_evaluate_with() with deferred Core imports (imports inside the method 
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins 
set this to True
+__is_builtin__: bool = False

Review Comment:
   This feels a bit too fragile to me. It’s probably better to maintain a list 
somewhere in core that the user can’t simply overwrite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-08 Thread via GitHub


amoghrajesh commented on PR #61461:
URL: https://github.com/apache/airflow/pull/61461#issuecomment-3869930729

   There's some failing checks which I am looking at


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-08 Thread via GitHub


amoghrajesh commented on PR #61461:
URL: https://github.com/apache/airflow/pull/61461#issuecomment-3867524395

   cc: @ferruzzi this is related to the discussion thread on slack


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Decouple deadline reference types from core in task SDK [airflow]

2026-02-04 Thread via GitHub


amoghrajesh commented on PR #61461:
URL: https://github.com/apache/airflow/pull/61461#issuecomment-3848880058

   Still a draft, sent it out to run an early CI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]