Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2881941183 ## airflow-core/src/airflow/models/deadline_alert.py: ## @@ -86,6 +86,8 @@ def matches_definition(self, other: DeadlineAlert) -> bool: @property def reference_class(self) -> type[SerializedReferenceModels.SerializedBaseDeadlineReference]: """Return the deserialized reference class.""" +if "__class_path" in self.reference: +return SerializedReferenceModels.SerializedCustomReference Review Comment: Maybe—if I understand you correctly, not sure if I do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877959964
##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
return cls(max_runs=reference_data["max_runs"],
min_runs=reference_data.get("min_runs"))
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta,
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)
Review Comment:
Handled in [comments from
kaxil](https://github.com/apache/airflow/pull/61461/commits/0d6ff563d1e57550b4fb3766a8834004c835fef7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877999592
##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
}
+_BUILTIN_DEADLINE_MODULES = (
Review Comment:
Yes, we should be maintaining backcompat here and I have tried to do that as
per above comment as well: [add support for older
refs](https://github.com/apache/airflow/pull/61461/commits/32455aca2b288ca2769868aa101abbaae3937f77)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877999592
##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
}
+_BUILTIN_DEADLINE_MODULES = (
Review Comment:
Yes, we should be maintaining backcompat here and I have tried to do that as
per above comment as well: 32455aca2b
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877967949
##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
}
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
def encode_deadline_reference(ref) -> dict[str, Any]:
"""
Encode a deadline reference.
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
:meta private:
"""
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:
Review Comment:
Thanks, it is a real concern. I will handle it by adding
`airflow.models.deadline` into builtins for now, feels easier.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877959539
##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
return cls(max_runs=reference_data["max_runs"],
min_runs=reference_data.get("min_runs"))
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta,
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)
Review Comment:
I handled it as suggested, thanks for the thorough review
##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
return cls(max_runs=reference_data["max_runs"],
min_runs=reference_data.get("min_runs"))
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta,
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)
Review Comment:
Handled in 0d6ff563d1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
kaxil commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877246051
##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
}
+_BUILTIN_DEADLINE_MODULES = (
Review Comment:
Open question: is it intentional that legacy/core `ReferenceModels.*`
references are now outside the builtin set? If backwards compatibility is
desired, should this tuple include `"airflow.models.deadline"` as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
kaxil commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2877244580
##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,49 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
return cls(max_runs=reference_data["max_runs"],
min_runs=reference_data.get("min_runs"))
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""
+Wrapper for custom deadline references.
+
+This class dynamically delegates to the wrapped reference for
required_kwargs and evaluation logic.
+"""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta,
**kwargs: Any) -> datetime | None:
+"""Pass through all kwargs to inner reference without filtering."""
+deadline = self.inner_ref._evaluate_with(session=session, **kwargs)
Review Comment:
I think this bypasses the base `evaluate_with()` validation/filtering path.
`SerializedBaseDeadlineReference.evaluate_with()` validates `required_kwargs`
and drops extras before calling `_evaluate_with()`, but here we call
`inner_ref._evaluate_with()` directly with all kwargs. That can break custom
refs with strict signatures (or missing keys) at runtime. Could we mirror the
base filtering/validation logic here and then delegate with filtered kwargs?
##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -200,19 +200,39 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
}
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
def encode_deadline_reference(ref) -> dict[str, Any]:
"""
Encode a deadline reference.
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
:meta private:
"""
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:
Review Comment:
Potential compatibility risk here: refs from `airflow.models.deadline` are
no longer treated as builtins, so they now rely on `__class_path` import during
decode. If that path resolves to nested classes (e.g. under `ReferenceModels`),
`import_string()` currently supports only top-level attributes, which can fail
deserialization. Should we either include `airflow.models.deadline` in builtins
or make import resolution nested-class aware?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2871162165
##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -259,7 +310,9 @@ class TYPES:
)
SerializedReferenceModels.TYPES.DAGRUN_QUEUED =
(SerializedReferenceModels.DagRunQueuedAtDeadline,)
SerializedReferenceModels.TYPES.DAGRUN = (
-SerializedReferenceModels.TYPES.DAGRUN_CREATED +
SerializedReferenceModels.TYPES.DAGRUN_QUEUED
+SerializedReferenceModels.TYPES.DAGRUN_CREATED
++ SerializedReferenceModels.TYPES.DAGRUN_QUEUED
++ (SerializedReferenceModels.SerializedCustomReference,)
)
Review Comment:
Handled in [comments from
tp](https://github.com/apache/airflow/pull/61461/commits/4394c58781d57bdf99d13ad3e18d5a5a404f9c34)
##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,52 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
return cls(max_runs=reference_data["max_runs"],
min_runs=reference_data.get("min_runs"))
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""Wrapper for custom deadline references."""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta,
**kwargs: Any) -> datetime | None:
+filtered_kwargs = {k: v for k, v in kwargs.items() if k in
self.required_kwargs}
Review Comment:
Good catch, handled in [pass to inner_ref through
kwargs](https://github.com/apache/airflow/pull/61461/commits/c65ae8a147bed7f962b622f78d51cef937ebef60)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2871152953 ## airflow-core/src/airflow/serialization/definitions/deadline.py: ## @@ -259,7 +310,9 @@ class TYPES: ) SerializedReferenceModels.TYPES.DAGRUN_QUEUED = (SerializedReferenceModels.DagRunQueuedAtDeadline,) SerializedReferenceModels.TYPES.DAGRUN = ( -SerializedReferenceModels.TYPES.DAGRUN_CREATED + SerializedReferenceModels.TYPES.DAGRUN_QUEUED +SerializedReferenceModels.TYPES.DAGRUN_CREATED ++ SerializedReferenceModels.TYPES.DAGRUN_QUEUED ++ (SerializedReferenceModels.SerializedCustomReference,) ) Review Comment: All good! It does the trick ```python mine = ( *SerializedReferenceModels.TYPES.DAGRUN_CREATED, *SerializedReferenceModels.TYPES.DAGRUN_QUEUED, SerializedReferenceModels.SerializedCustomReference, ) mine Out[7]: (airflow.serialization.definitions.deadline.SerializedReferenceModels.DagRunLogicalDateDeadline, airflow.serialization.definitions.deadline.SerializedReferenceModels.FixedDatetimeDeadline, airflow.serialization.definitions.deadline.SerializedReferenceModels.AverageRuntimeDeadline, airflow.serialization.definitions.deadline.SerializedReferenceModels.DagRunQueuedAtDeadline, airflow.serialization.definitions.deadline.SerializedReferenceModels.SerializedCustomReference) mine == SerializedReferenceModels.TYPES.DAGRUN Out[8]: True ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2871140716 ## airflow-core/src/airflow/models/deadline_alert.py: ## @@ -86,6 +86,8 @@ def matches_definition(self, other: DeadlineAlert) -> bool: @property def reference_class(self) -> type[SerializedReferenceModels.SerializedBaseDeadlineReference]: """Return the deserialized reference class.""" +if "__class_path" in self.reference: +return SerializedReferenceModels.SerializedCustomReference Review Comment: Hmm, what do you instead think of matching the reference type _with all other_ builtin types and if none matches, return a custom reference? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2870768056 ## airflow-core/src/airflow/serialization/definitions/deadline.py: ## @@ -259,7 +310,9 @@ class TYPES: ) SerializedReferenceModels.TYPES.DAGRUN_QUEUED = (SerializedReferenceModels.DagRunQueuedAtDeadline,) SerializedReferenceModels.TYPES.DAGRUN = ( -SerializedReferenceModels.TYPES.DAGRUN_CREATED + SerializedReferenceModels.TYPES.DAGRUN_QUEUED +SerializedReferenceModels.TYPES.DAGRUN_CREATED ++ SerializedReferenceModels.TYPES.DAGRUN_QUEUED ++ (SerializedReferenceModels.SerializedCustomReference,) ) Review Comment: Not sure if this would work ```python SerializedReferenceModels.TYPES.DAGRUN = ( *SerializedReferenceModels.TYPES.DAGRUN_CREATED, *SerializedReferenceModels.TYPES.DAGRUN_QUEUED, SerializedReferenceModels.SerializedCustomReference, ) ``` Slightly more readable to me if it does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2870765936 ## airflow-core/src/airflow/models/deadline_alert.py: ## @@ -86,6 +86,8 @@ def matches_definition(self, other: DeadlineAlert) -> bool: @property def reference_class(self) -> type[SerializedReferenceModels.SerializedBaseDeadlineReference]: """Return the deserialized reference class.""" +if "__class_path" in self.reference: +return SerializedReferenceModels.SerializedCustomReference Review Comment: I wonder if there’s a better way to do this (Not saying this is wrong, just the criteria is a bit random and maybe too easy to break) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870619189
##
airflow-core/src/airflow/serialization/definitions/deadline.py:
##
@@ -239,6 +250,52 @@ def serialize_reference(self) -> dict:
def deserialize_reference(cls, reference_data: dict):
return cls(max_runs=reference_data["max_runs"],
min_runs=reference_data.get("min_runs"))
+class SerializedCustomReference(SerializedBaseDeadlineReference):
+"""Wrapper for custom deadline references."""
+
+def __init__(self, inner_ref):
+self.inner_ref = inner_ref
+
+@property
+def reference_name(self) -> str:
+return self.inner_ref.reference_name
+
+def evaluate_with(self, *, session: Session, interval: timedelta,
**kwargs: Any) -> datetime | None:
+filtered_kwargs = {k: v for k, v in kwargs.items() if k in
self.required_kwargs}
Review Comment:
Good catch, handled in [comments from
kaxil](https://github.com/apache/airflow/pull/61461/commits/f78d6235c6400934175912142e29eafaf59421a9)
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -118,69 +223,64 @@ class TYPES:
# Deadlines that should be created when the DagRun is created.
DAGRUN_CREATED: DeadlineReferenceTypes = (
-ReferenceModels.DagRunLogicalDateDeadline,
-ReferenceModels.FixedDatetimeDeadline,
-ReferenceModels.AverageRuntimeDeadline,
+DagRunLogicalDateDeadline,
+FixedDatetimeDeadline,
+AverageRuntimeDeadline,
)
# Deadlines that should be created when the DagRun is queued.
-DAGRUN_QUEUED: DeadlineReferenceTypes =
(ReferenceModels.DagRunQueuedAtDeadline,)
+DAGRUN_QUEUED: DeadlineReferenceTypes = (DagRunQueuedAtDeadline,)
# All DagRun-related deadline types.
DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
-from airflow.models.deadline import ReferenceModels
-
-DAGRUN_LOGICAL_DATE: DeadlineReferenceType =
ReferenceModels.DagRunLogicalDateDeadline()
-DAGRUN_QUEUED_AT: DeadlineReferenceType =
ReferenceModels.DagRunQueuedAtDeadline()
+DAGRUN_LOGICAL_DATE: DeadlineReferenceType = DagRunLogicalDateDeadline()
+DAGRUN_QUEUED_AT: DeadlineReferenceType = DagRunQueuedAtDeadline()
@classmethod
def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None)
-> DeadlineReferenceType:
if max_runs == 0:
-max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
+max_runs = AverageRuntimeDeadline.DEFAULT_LIMIT
if min_runs is None:
min_runs = max_runs
-return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
+return AverageRuntimeDeadline(max_runs, min_runs)
@classmethod
-def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
-return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
+def FIXED_DATETIME(cls, dt: datetime) -> DeadlineReferenceType:
+return FixedDatetimeDeadline(dt)
# TODO: Remove this once other deadline types exist.
# This is a temporary reference type used only in tests to verify that
# dag.has_dagrun_deadline() returns false if the dag has a non-dagrun
deadline type.
# It should be replaced with a real non-dagrun deadline type when one is
available.
_TEMPORARY_TEST_REFERENCE = type(
"TemporaryTestDeadlineForTypeChecking",
-(DeadlineReferenceType,),
-{"_evaluate_with": lambda self, **kwargs: datetime.now()},
+(BaseDeadlineReference,),
+{"serialize_reference": lambda self: {REFERENCE_TYPE_FIELD:
"TemporaryTestDeadlineForTypeChecking"}},
)()
@classmethod
def register_custom_reference(
cls,
-reference_class: type[ReferenceModels.BaseDeadlineReference],
+reference_class: type[BaseDeadlineReference],
deadline_reference_type: DeadlineReferenceTypes | None = None,
-) -> type[ReferenceModels.BaseDeadlineReference]:
+) -> type[BaseDeadlineReference]:
"""
Register a custom deadline reference class.
:param reference_class: The custom reference class inheriting from
BaseDeadlineReference
:param deadline_reference_type: A DeadlineReference.TYPES for when the
deadline should be evaluated ("DAGRUN_CREATED",
"DAGRUN_QUEUED", etc.); defaults to
DeadlineReference.TYPES.DAGRUN_CREATED
"""
-from airflow.models.deadline import ReferenceModels
-
# Default to DAGRUN_CREATED if no deadline_reference_type specified
if deadline_reference_type is None:
deadline_reference_type = cls.TYPES.DAGRUN_CREATED
# Validate the reference class inherits from BaseDeadlineReference
-if not issubclass(reference_class,
ReferenceModels.BaseDeadli
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2870589608
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -118,69 +223,64 @@ class TYPES:
# Deadlines that should be created when the DagRun is created.
DAGRUN_CREATED: DeadlineReferenceTypes = (
-ReferenceModels.DagRunLogicalDateDeadline,
-ReferenceModels.FixedDatetimeDeadline,
-ReferenceModels.AverageRuntimeDeadline,
+DagRunLogicalDateDeadline,
+FixedDatetimeDeadline,
+AverageRuntimeDeadline,
)
# Deadlines that should be created when the DagRun is queued.
-DAGRUN_QUEUED: DeadlineReferenceTypes =
(ReferenceModels.DagRunQueuedAtDeadline,)
+DAGRUN_QUEUED: DeadlineReferenceTypes = (DagRunQueuedAtDeadline,)
# All DagRun-related deadline types.
DAGRUN: DeadlineReferenceTypes = DAGRUN_CREATED + DAGRUN_QUEUED
-from airflow.models.deadline import ReferenceModels
-
-DAGRUN_LOGICAL_DATE: DeadlineReferenceType =
ReferenceModels.DagRunLogicalDateDeadline()
-DAGRUN_QUEUED_AT: DeadlineReferenceType =
ReferenceModels.DagRunQueuedAtDeadline()
+DAGRUN_LOGICAL_DATE: DeadlineReferenceType = DagRunLogicalDateDeadline()
+DAGRUN_QUEUED_AT: DeadlineReferenceType = DagRunQueuedAtDeadline()
@classmethod
def AVERAGE_RUNTIME(cls, max_runs: int = 0, min_runs: int | None = None)
-> DeadlineReferenceType:
if max_runs == 0:
-max_runs = cls.ReferenceModels.AverageRuntimeDeadline.DEFAULT_LIMIT
+max_runs = AverageRuntimeDeadline.DEFAULT_LIMIT
if min_runs is None:
min_runs = max_runs
-return cls.ReferenceModels.AverageRuntimeDeadline(max_runs, min_runs)
+return AverageRuntimeDeadline(max_runs, min_runs)
@classmethod
-def FIXED_DATETIME(cls, datetime: datetime) -> DeadlineReferenceType:
-return cls.ReferenceModels.FixedDatetimeDeadline(datetime)
+def FIXED_DATETIME(cls, dt: datetime) -> DeadlineReferenceType:
+return FixedDatetimeDeadline(dt)
# TODO: Remove this once other deadline types exist.
# This is a temporary reference type used only in tests to verify that
# dag.has_dagrun_deadline() returns false if the dag has a non-dagrun
deadline type.
# It should be replaced with a real non-dagrun deadline type when one is
available.
_TEMPORARY_TEST_REFERENCE = type(
"TemporaryTestDeadlineForTypeChecking",
-(DeadlineReferenceType,),
-{"_evaluate_with": lambda self, **kwargs: datetime.now()},
+(BaseDeadlineReference,),
+{"serialize_reference": lambda self: {REFERENCE_TYPE_FIELD:
"TemporaryTestDeadlineForTypeChecking"}},
)()
@classmethod
def register_custom_reference(
cls,
-reference_class: type[ReferenceModels.BaseDeadlineReference],
+reference_class: type[BaseDeadlineReference],
deadline_reference_type: DeadlineReferenceTypes | None = None,
-) -> type[ReferenceModels.BaseDeadlineReference]:
+) -> type[BaseDeadlineReference]:
"""
Register a custom deadline reference class.
:param reference_class: The custom reference class inheriting from
BaseDeadlineReference
:param deadline_reference_type: A DeadlineReference.TYPES for when the
deadline should be evaluated ("DAGRUN_CREATED",
"DAGRUN_QUEUED", etc.); defaults to
DeadlineReference.TYPES.DAGRUN_CREATED
"""
-from airflow.models.deadline import ReferenceModels
-
# Default to DAGRUN_CREATED if no deadline_reference_type specified
if deadline_reference_type is None:
deadline_reference_type = cls.TYPES.DAGRUN_CREATED
# Validate the reference class inherits from BaseDeadlineReference
-if not issubclass(reference_class,
ReferenceModels.BaseDeadlineReference):
+if not issubclass(reference_class, BaseDeadlineReference):
Review Comment:
Good call! It might break for such users, let me add a compat shim, I was
under the impression that since this is an experimental feature, we should be
OK to do it, but maybe thats not a great idea!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2815482327 ## airflow-core/src/airflow/serialization/definitions/deadline.py: ## @@ -226,7 +224,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None: return None avg_duration_seconds = sum(durations) / len(durations) -return timezone.utcnow() + timedelta(seconds=avg_duration_seconds) +return timezone.utcnow() + timedelta(seconds=float(avg_duration_seconds)) Review Comment: Now I remember why, I saw this failure: ```python airflow-core/tests/unit/models/test_deadline.py:480: in test_average_runtime_with_min_runs result = reference.evaluate_with(session=session, interval=interval, dag_id=DAG_ID) airflow-core/src/airflow/serialization/definitions/deadline.py:104: in evaluate_with base_time = self._evaluate_with(session=session, **filtered_kwargs) airflow-core/src/airflow/utils/session.py:98: in wrapper return func(*args, **kwargs) airflow-core/src/airflow/serialization/definitions/deadline.py:229: in _evaluate_with return timezone.utcnow() + timedelta(seconds=avg_duration_seconds) E TypeError: unsupported type for timedelta seconds component: decimal.Decimal ``` But I am handling it exactly how it is done by models/deadline: https://github.com/apache/airflow/blob/main/airflow-core/src/airflow/models/deadline.py#L417-L429 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh closed pull request #61461: Decouple deadline reference types from core in task SDK URL: https://github.com/apache/airflow/pull/61461 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2815267654 ## airflow-core/src/airflow/serialization/definitions/deadline.py: ## @@ -226,7 +224,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None: return None avg_duration_seconds = sum(durations) / len(durations) -return timezone.utcnow() + timedelta(seconds=avg_duration_seconds) +return timezone.utcnow() + timedelta(seconds=float(avg_duration_seconds)) Review Comment: Actually I do not remember why I made that, just reverting it, seems unnecessary. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2815258094
##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -193,19 +193,39 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
}
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
def encode_deadline_reference(ref) -> dict[str, Any]:
"""
Encode a deadline reference.
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
:meta private:
"""
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:
+serialized["__class_path"] = qualname(type(ref))
Review Comment:
You're right, it does. Handled it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on PR #61461: URL: https://github.com/apache/airflow/pull/61461#issuecomment-3896826246 Good question, it lead me to performing some more testing and it revealed that deadline alerts with custom refs broke after my last PR: https://github.com/apache/airflow/pull/61118. I reworked this one, made some changes and have pushed it and that's reflected in the PR desc. Let me know what you thinlk -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
ferruzzi commented on PR #61461: URL: https://github.com/apache/airflow/pull/61461#issuecomment-3887841391 I don't understand how this is intended to work. Can you show me what a dag definition looks like now? Also, what is the process for adding a new reference now? Do we still add the logic in models path but now also and the empty class signature in the definitions file as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
ferruzzi commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2796084465
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -203,29 +303,36 @@ def register_custom_reference(
def deadline_reference(
deadline_reference_type: DeadlineReferenceTypes | None = None,
-) -> Callable[[type[ReferenceModels.BaseDeadlineReference]],
type[ReferenceModels.BaseDeadlineReference]]:
+) -> Callable[[type[BaseDeadlineReference]], type[BaseDeadlineReference]]:
"""
Decorate a class to register a custom deadline reference.
Usage:
@deadline_reference()
-class MyCustomReference(ReferenceModels.BaseDeadlineReference):
+class MyCustomReference(BaseDeadlineReference):
# By default, evaluate_with will be called when a new dagrun is
created.
def _evaluate_with(self, *, session: Session, **kwargs) ->
datetime:
-# Put your business logic here
+# Put your business logic here (use deferred imports for Core
types)
+from airflow.models import DagRun
return some_datetime
+def serialize_reference(self) -> dict:
+return {"reference_type": self.reference_name}
+
@deadline_reference(DeadlineReference.TYPES.DAGRUN_QUEUED)
-class MyQueuedRef(ReferenceModels.BaseDeadlineReference):
+class MyQueuedRef(BaseDeadlineReference):
# Optionally, you can specify when you want it calculated by
providing a DeadlineReference.TYPES
def _evaluate_with(self, *, session: Session, **kwargs) ->
datetime:
# Put your business logic here
Review Comment:
I assume you want to put the same note about imports as you did in the other
example?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on PR #61461: URL: https://github.com/apache/airflow/pull/61461#issuecomment-3882458691 I think this is good more or less now with a couple of minor suggestions. Also would appreciate @ferruzzi if you could provide some perspective if this would step on your work too much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2791547162
##
airflow-core/src/airflow/serialization/encoders.py:
##
@@ -193,19 +193,39 @@ def encode_deadline_alert(d: DeadlineAlert |
SerializedDeadlineAlert) -> dict[st
from airflow.sdk.serde import serialize
return {
-"reference": d.reference.serialize_reference(),
+"reference": encode_deadline_reference(d.reference),
"interval": d.interval.total_seconds(),
"callback": serialize(d.callback),
}
+_BUILTIN_DEADLINE_MODULES = (
+"airflow.sdk.definitions.deadline",
+"airflow.serialization.definitions.deadline",
+)
+
+
def encode_deadline_reference(ref) -> dict[str, Any]:
"""
Encode a deadline reference.
+For custom (non-builtin) deadline references, includes the class path
+so the decoder can import the user's class at runtime.
+
:meta private:
"""
-return ref.serialize_reference()
+from airflow._shared.module_loading import qualname
+
+serialized = ref.serialize_reference()
+
+# Custom types (not built-in) need __class_path so the decoder can import
them.
+# Unlike built-in types which are looked up in SerializedReferenceModels,
+# custom types are discovered via import_string(__class_path) at
deserialization time.
+module = type(ref).__module__
+if module not in _BUILTIN_DEADLINE_MODULES:
+serialized["__class_path"] = qualname(type(ref))
Review Comment:
```suggestion
serialized["__class_path"] = qualname(ref)
```
IIRC qualname handles this automatically
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2791545765 ## airflow-core/src/airflow/serialization/definitions/deadline.py: ## @@ -226,7 +224,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None: return None avg_duration_seconds = sum(durations) / len(durations) -return timezone.utcnow() + timedelta(seconds=avg_duration_seconds) +return timezone.utcnow() + timedelta(seconds=float(avg_duration_seconds)) Review Comment: Why is this cast needed? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2787214241
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
logger = logging.getLogger(__name__)
-DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and
implement
+_evaluate_with() with deferred Core imports (imports inside the method
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) ->
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}
Review Comment:
I just dropped the commit and am removing inheritnace from child classes
where not needed: fc38cafb2d
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781468006
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
logger = logging.getLogger(__name__)
-DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and
implement
+_evaluate_with() with deferred Core imports (imports inside the method
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) ->
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}
Review Comment:
Just tried to do it in [comments from
tp](https://github.com/apache/airflow/pull/61461/commits/33fcd264b8bfbff9160b95de88c87a39d8d0c9b9)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781444229
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
logger = logging.getLogger(__name__)
-DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and
implement
+_evaluate_with() with deferred Core imports (imports inside the method
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) ->
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}
Review Comment:
Interesting perspective about serializer injecting it, we should be able to
yes since serializer anyways owns that the serialization flow
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781426429
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
logger = logging.getLogger(__name__)
-DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and
implement
+_evaluate_with() with deferred Core imports (imports inside the method
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) ->
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}
Review Comment:
Umm yes you are right, we should be able to safely remove it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2781372150 ## task-sdk/src/airflow/sdk/definitions/deadline.py: ## @@ -29,7 +30,128 @@ logger = logging.getLogger(__name__) -DeadlineReferenceTypes: TypeAlias = tuple[type[ReferenceModels.BaseDeadlineReference], ...] +# Field name used in serialization - must be in sync with SerializedReferenceModels.REFERENCE_TYPE_FIELD +REFERENCE_TYPE_FIELD = "reference_type" + + +class BaseDeadlineReference(ABC): +""" +Base class for all Deadline Reference implementations. + +This is a lightweight SDK class for DAG authoring. It only handles serialization. +The actual evaluation logic (_evaluate_with) is in Core's SerializedReferenceModels. + +For custom deadline references, users should inherit from this class and implement +_evaluate_with() with deferred Core imports (imports inside the method body). +""" + +# way to detect builtin types. custom types inherit False while builtins set this to True +__is_builtin__: bool = False Review Comment: Umm maybe what if I can add it in the encoder maybe? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781304639
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
logger = logging.getLogger(__name__)
-DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and
implement
+_evaluate_with() with deferred Core imports (imports inside the method
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) ->
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}
Review Comment:
I wonder if `{REFERENCE_TYPE_FIELD: self.reference_name}` should be injected
automatically in the serializer so subclasses don’t need to repeat it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461:
URL: https://github.com/apache/airflow/pull/61461#discussion_r2781300720
##
task-sdk/src/airflow/sdk/definitions/deadline.py:
##
@@ -29,7 +30,128 @@
logger = logging.getLogger(__name__)
-DeadlineReferenceTypes: TypeAlias =
tuple[type[ReferenceModels.BaseDeadlineReference], ...]
+# Field name used in serialization - must be in sync with
SerializedReferenceModels.REFERENCE_TYPE_FIELD
+REFERENCE_TYPE_FIELD = "reference_type"
+
+
+class BaseDeadlineReference(ABC):
+"""
+Base class for all Deadline Reference implementations.
+
+This is a lightweight SDK class for DAG authoring. It only handles
serialization.
+The actual evaluation logic (_evaluate_with) is in Core's
SerializedReferenceModels.
+
+For custom deadline references, users should inherit from this class and
implement
+_evaluate_with() with deferred Core imports (imports inside the method
body).
+"""
+
+# way to detect builtin types. custom types inherit False while builtins
set this to True
+__is_builtin__: bool = False
+
+@property
+def reference_name(self) -> str:
+"""Return the class name as the reference identifier."""
+return self.__class__.__name__
+
+def serialize_reference(self) -> dict[str, Any]:
+"""
+Serialize this reference type into a dictionary representation.
+
+Override this method in subclasses if additional data is needed for
serialization.
+"""
+return {REFERENCE_TYPE_FIELD: self.reference_name}
+
+@classmethod
+def deserialize_reference(cls, reference_data: dict[str, Any]) ->
BaseDeadlineReference:
+"""
+Deserialize a reference type from its dictionary representation.
+
+:param reference_data: Dictionary containing serialized reference data.
+"""
+return cls()
+
+def __eq__(self, other: object) -> bool:
+if not isinstance(other, BaseDeadlineReference):
+return NotImplemented
+return self.serialize_reference() == other.serialize_reference()
+
+def __hash__(self) -> int:
+return hash(frozenset(self.serialize_reference().items()))
+
+
+class DagRunLogicalDateDeadline(BaseDeadlineReference):
+"""A deadline that returns a DagRun's logical date."""
+
+__is_builtin__ = True
+
+def serialize_reference(self) -> dict[str, Any]:
+return {REFERENCE_TYPE_FIELD: self.reference_name}
Review Comment:
This does not seem needed? It’s exactly the same as in the base. (Same for
DagRunQueuedAtDeadline)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
uranusjr commented on code in PR #61461: URL: https://github.com/apache/airflow/pull/61461#discussion_r2781295991 ## task-sdk/src/airflow/sdk/definitions/deadline.py: ## @@ -29,7 +30,128 @@ logger = logging.getLogger(__name__) -DeadlineReferenceTypes: TypeAlias = tuple[type[ReferenceModels.BaseDeadlineReference], ...] +# Field name used in serialization - must be in sync with SerializedReferenceModels.REFERENCE_TYPE_FIELD +REFERENCE_TYPE_FIELD = "reference_type" + + +class BaseDeadlineReference(ABC): +""" +Base class for all Deadline Reference implementations. + +This is a lightweight SDK class for DAG authoring. It only handles serialization. +The actual evaluation logic (_evaluate_with) is in Core's SerializedReferenceModels. + +For custom deadline references, users should inherit from this class and implement +_evaluate_with() with deferred Core imports (imports inside the method body). +""" + +# way to detect builtin types. custom types inherit False while builtins set this to True +__is_builtin__: bool = False Review Comment: This feels a bit too fragile to me. It’s probably better to maintain a list somewhere in core that the user can’t simply overwrite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on PR #61461: URL: https://github.com/apache/airflow/pull/61461#issuecomment-3869930729 There's some failing checks which I am looking at -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on PR #61461: URL: https://github.com/apache/airflow/pull/61461#issuecomment-3867524395 cc: @ferruzzi this is related to the discussion thread on slack -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
Re: [PR] Decouple deadline reference types from core in task SDK [airflow]
amoghrajesh commented on PR #61461: URL: https://github.com/apache/airflow/pull/61461#issuecomment-3848880058 Still a draft, sent it out to run an early CI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
