Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-11 Thread via GitHub


uranusjr merged PR #52234:
URL: https://github.com/apache/airflow/pull/52234


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-10 Thread via GitHub


ashb commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2198056662


##
airflow-core/src/airflow/serialization/serialized_objects.py:
##
@@ -738,7 +752,7 @@ def serialize(
 return var.to_dict()
 elif isinstance(var, MappedOperator):
 return 
cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)
-elif isinstance(var, TaskSDKBaseOperator):
+elif isinstance(var, (BaseOperator, SerializedBaseOperator)):

Review Comment:
   I'd say keep it for now in this PR, but remove it. This isn't useful anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-10 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2196869199


##
airflow-core/src/airflow/models/mappedoperator.py:
##
@@ -173,3 +146,87 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> 
str | None:
 if not link:
 return None
 return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
+
+
[email protected]
+def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> 
int:
+raise NotImplementedError(f"Not implemented for {type(task)}")
+
+
+# Still accept TaskSDKBaseOperator because some tests don't go through 
serialization.
+# TODO (GH-52141): Rewrite tests so we can drop SDK references at some point.
+@get_mapped_ti_count.register(SerializedBaseOperator)
+@get_mapped_ti_count.register(TaskSDKBaseOperator)
+def _(task: SerializedBaseOperator | TaskSDKBaseOperator, run_id: str, *, 
session: Session) -> int:

Review Comment:
   Turns out `singledispatch` does not support unions so we’ll need to keep 
this verbose syntax a bit longer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-10 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2196808002


##
airflow-core/src/airflow/serialization/serialized_objects.py:
##
@@ -1281,11 +1357,11 @@ def serialize_mapped_operator(cls, op: MappedOperator) 
-> dict[str, Any]:
 return serialized_op
 
 @classmethod
-def serialize_operator(cls, op: TaskSDKBaseOperator | MappedOperator) -> 
dict[str, Any]:
+def serialize_operator(cls, op: BaseOperator | MappedOperator | 
SerializedBaseOperator) -> dict[str, Any]:

Review Comment:
   See https://github.com/apache/airflow/pull/52234#discussion_r2196630623



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-09 Thread via GitHub


uranusjr commented on PR #52234:
URL: https://github.com/apache/airflow/pull/52234#issuecomment-3055725470

   Serialised dag JSON does not change; this only removes the JSON → 
SerializedBaseOperator part; the SDK operator → JSON part is not affected.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-09 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2196671637


##
airflow-core/src/airflow/models/taskinstance.py:
##
@@ -881,7 +886,11 @@ def refresh_from_db(
 else:
 self.state = None
 
-def refresh_from_task(self, task: Operator, pool_override: str | None = 
None) -> None:
+def refresh_from_task(
+self,
+task: Operator | SerializedBaseOperator,

Review Comment:
   A thousand tests and some (probably unused) code paths. It’s probably better 
to have a separate PR focusing just on pruning those.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-09 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2196653837


##
airflow-core/src/airflow/models/mappedoperator.py:
##
@@ -173,3 +151,88 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> 
str | None:
 if not link:
 return None
 return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
+
+
[email protected]
+def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> 
int:
+raise NotImplementedError(f"Not implemented for {type(task)}")
+
+
+# https://github.com/python/cpython/issues/86153
+# While we support Python 3.9 we can't rely on the type hint, we need to pass 
the type explicitly to
+# register.
+@get_mapped_ti_count.register(SerializedBaseOperator)
+@get_mapped_ti_count.register(TaskSDKBaseOperator)  # Some tests don't go 
through task serialization...
+def _(task: SerializedBaseOperator | TaskSDKBaseOperator, run_id: str, *, 
session: Session) -> int:
+group = task.get_closest_mapped_task_group()
+if group is None:
+raise NotMapped()
+return get_mapped_ti_count(group, run_id, session=session)
+
+
+@get_mapped_ti_count.register(MappedOperator)
+@get_mapped_ti_count.register(TaskSDKMappedOperator)  # Some tests don't go 
through task serialization...
+def _(task: MappedOperator | TaskSDKMappedOperator, run_id: str, *, session: 
Session) -> int:
+from airflow.serialization.serialized_objects import BaseSerialization, 
_ExpandInputRef
+
+exp_input = task._get_specified_expand_input()
+if isinstance(exp_input, _ExpandInputRef):
+exp_input = exp_input.deref(task.dag)
+# TODO: TaskSDK This is only needed to support `dag.test()` etc until we 
port it over to use the
+# task sdk runner.
+if not hasattr(exp_input, "get_total_map_length"):

Review Comment:
   It turns out we still need this code path for CLI. I’ve updated the comment 
to remove the 'only needed for dag.test()' part.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-09 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2196630623


##
airflow-core/src/airflow/serialization/serialized_objects.py:
##
@@ -738,7 +752,7 @@ def serialize(
 return var.to_dict()
 elif isinstance(var, MappedOperator):
 return 
cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)
-elif isinstance(var, TaskSDKBaseOperator):
+elif isinstance(var, (BaseOperator, SerializedBaseOperator)):

Review Comment:
   We have explicit tests ensuring double-serializing should return a correct 
result, this keeps the test successful. I don’t know the idea behind that but 
this is needed to keep CI green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-09 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2195803574


##
airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -110,7 +109,7 @@ def _fill_task_group_map(
 _fill_task_group_map(task_node=child, parent_node=task_node)
 return
 
-if isinstance(task_node, BaseOperator):
+if isinstance(task_node, (BaseOperator, SerializedBaseOperator)):

Review Comment:
   I tried to remove a few of BaseOperator checks and tests seem to be happy… 
I’m going to continue doing that until something barks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-08 Thread via GitHub


ashb commented on PR #52234:
URL: https://github.com/apache/airflow/pull/52234#issuecomment-3048335673

   I want to take a look at what the serialized Dag JSON looks like with this 
change too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-08 Thread via GitHub


ashb commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2192051819


##
airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -110,7 +109,7 @@ def _fill_task_group_map(
 _fill_task_group_map(task_node=child, parent_node=task_node)
 return
 
-if isinstance(task_node, BaseOperator):
+if isinstance(task_node, (BaseOperator, SerializedBaseOperator)):

Review Comment:
   This can't ever be a `BaseOperator` can it?



##
airflow-core/src/airflow/api_fastapi/core_api/services/ui/grid.py:
##
@@ -344,8 +345,8 @@ def _get_aggs_for_node(detail):
 
 
 def _find_aggregates(
-node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap,
-parent_node: TaskGroup | BaseOperator | MappedTaskGroup | TaskMap | None,
+node: TaskGroup | BaseOperator | MappedTaskGroup | SerializedBaseOperator 
| TaskMap,
+parent_node: TaskGroup | BaseOperator | MappedTaskGroup | 
SerializedBaseOperator | TaskMap | None,

Review Comment:
   Ditto, it's not possible for these to be `BaseOperator`?



##
airflow-core/src/airflow/serialization/serialized_objects.py:
##
@@ -738,7 +752,7 @@ def serialize(
 return var.to_dict()
 elif isinstance(var, MappedOperator):
 return 
cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)
-elif isinstance(var, TaskSDKBaseOperator):
+elif isinstance(var, (BaseOperator, SerializedBaseOperator)):

Review Comment:
   I don't think it makes sense to serialized an already Serialized operator, 
does it?



##
airflow-core/src/airflow/serialization/serialized_objects.py:
##
@@ -1281,11 +1357,11 @@ def serialize_mapped_operator(cls, op: MappedOperator) 
-> dict[str, Any]:
 return serialized_op
 
 @classmethod
-def serialize_operator(cls, op: TaskSDKBaseOperator | MappedOperator) -> 
dict[str, Any]:
+def serialize_operator(cls, op: BaseOperator | MappedOperator | 
SerializedBaseOperator) -> dict[str, Any]:

Review Comment:
   Ditto.



##
airflow-core/src/airflow/models/taskinstance.py:
##
@@ -881,7 +886,11 @@ def refresh_from_db(
 else:
 self.state = None
 
-def refresh_from_task(self, task: Operator, pool_override: str | None = 
None) -> None:
+def refresh_from_task(
+self,
+task: Operator | SerializedBaseOperator,

Review Comment:
   And here -- when is this used with anything other than a serialized op?



##
airflow-core/src/airflow/serialization/serialized_objects.py:
##
@@ -116,6 +122,14 @@
 except ImportError:
 pass
 
+DEFAULT_OPERATOR_DEPS = {

Review Comment:
   Nit: frozenset?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2191509304


##
airflow-core/src/airflow/models/mappedoperator.py:
##
@@ -173,3 +151,88 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> 
str | None:
 if not link:
 return None
 return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
+
+
[email protected]
+def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> 
int:
+raise NotImplementedError(f"Not implemented for {type(task)}")
+
+
+# https://github.com/python/cpython/issues/86153
+# While we support Python 3.9 we can't rely on the type hint, we need to pass 
the type explicitly to

Review Comment:
   Let me try to change this in main too.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2191503563


##
airflow-core/src/airflow/models/taskinstance.py:
##
@@ -1898,22 +1897,23 @@ def get_template_context(
 :param session: SQLAlchemy ORM Session
 :param ignore_param_exceptions: flag to suppress value exceptions 
while initializing the ParamsDict
 """
-if TYPE_CHECKING:
-assert self.task
-assert isinstance(self.task.dag, SchedulerDAG)
-
 # Do not use provide_session here -- it expunges everything on exit!
 if not session:
 session = settings.Session()
 
+if TYPE_CHECKING:
+assert session
+assert isinstance(self.task, (BaseOperator, MappedOperator))

Review Comment:
   Mypy (1.9 where we’re on) has some bugs regarding this. 
https://github.com/python/mypy/pull/17371
   
   This will need to wait for https://github.com/apache/airflow/pull/52997



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2191499914


##
airflow-core/src/airflow/models/mappedoperator.py:
##
@@ -173,3 +151,88 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> 
str | None:
 if not link:
 return None
 return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
+
+
[email protected]
+def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> 
int:
+raise NotImplementedError(f"Not implemented for {type(task)}")
+
+
+# https://github.com/python/cpython/issues/86153
+# While we support Python 3.9 we can't rely on the type hint, we need to pass 
the type explicitly to
+# register.
+@get_mapped_ti_count.register(SerializedBaseOperator)
+@get_mapped_ti_count.register(TaskSDKBaseOperator)  # Some tests don't go 
through task serialization...
+def _(task: SerializedBaseOperator | TaskSDKBaseOperator, run_id: str, *, 
session: Session) -> int:
+group = task.get_closest_mapped_task_group()
+if group is None:
+raise NotMapped()
+return get_mapped_ti_count(group, run_id, session=session)
+
+
+@get_mapped_ti_count.register(MappedOperator)
+@get_mapped_ti_count.register(TaskSDKMappedOperator)  # Some tests don't go 
through task serialization...
+def _(task: MappedOperator | TaskSDKMappedOperator, run_id: str, *, session: 
Session) -> int:
+from airflow.serialization.serialized_objects import BaseSerialization, 
_ExpandInputRef
+
+exp_input = task._get_specified_expand_input()
+if isinstance(exp_input, _ExpandInputRef):
+exp_input = exp_input.deref(task.dag)
+# TODO: TaskSDK This is only needed to support `dag.test()` etc until we 
port it over to use the
+# task sdk runner.
+if not hasattr(exp_input, "get_total_map_length"):

Review Comment:
   Yeah this is moved from the old Baseoperator class. Let me try to remove it 
in main.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2191491897


##
airflow-core/src/airflow/models/renderedtifields.py:
##
@@ -49,9 +49,10 @@
 
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
 from airflow.sdk.types import Operator
+from airflow.serialization.serialized_objects import SerializedBaseOperator
 
 
-def get_serialized_template_fields(task: Operator):
+def get_serialized_template_fields(task: Operator | SerializedBaseOperator):

Review Comment:
   The goal ultimately is to have two BaseOperator-MappedOperator pairs, one in 
the sdk, th other in the scheduler. Each function, depending on what it is for, 
should only receive/return one pair or the other. The scheduler pair will be 
created later after MappedOperator is split like the BaseOperator, but I don’t 
feel we should have a common base for Operator (this is for the sdk pair) and 
SerializedOperator (used in core).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


kaxil commented on PR #52234:
URL: https://github.com/apache/airflow/pull/52234#issuecomment-3046448780

   (rebased on main to get latest changes & running with full-tests now)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2190960709


##
airflow-core/src/airflow/models/taskinstance.py:
##
@@ -1898,22 +1897,23 @@ def get_template_context(
 :param session: SQLAlchemy ORM Session
 :param ignore_param_exceptions: flag to suppress value exceptions 
while initializing the ParamsDict
 """
-if TYPE_CHECKING:
-assert self.task
-assert isinstance(self.task.dag, SchedulerDAG)
-
 # Do not use provide_session here -- it expunges everything on exit!
 if not session:
 session = settings.Session()
 
+if TYPE_CHECKING:
+assert session
+assert isinstance(self.task, (BaseOperator, MappedOperator))

Review Comment:
   We can use `Operator` here I think since it is already defined at L136:
   
   ```
   Operator: TypeAlias = BaseOperator | MappedOperator
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2190959196


##
airflow-core/src/airflow/models/renderedtifields.py:
##
@@ -49,9 +49,10 @@
 
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
 from airflow.sdk.types import Operator
+from airflow.serialization.serialized_objects import SerializedBaseOperator
 
 
-def get_serialized_template_fields(task: Operator):
+def get_serialized_template_fields(task: Operator | SerializedBaseOperator):

Review Comment:
   (it is used a lot)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2190958855


##
airflow-core/src/airflow/models/renderedtifields.py:
##
@@ -49,9 +49,10 @@
 
 from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
 from airflow.sdk.types import Operator
+from airflow.serialization.serialized_objects import SerializedBaseOperator
 
 
-def get_serialized_template_fields(task: Operator):
+def get_serialized_template_fields(task: Operator | SerializedBaseOperator):

Review Comment:
   Should we just create an internal type for `Operator | 
SerializedBaseOperator` to avoid repetitions for type hints and isintance 
checks?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2190958032


##
airflow-core/src/airflow/models/mappedoperator.py:
##
@@ -173,3 +151,88 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> 
str | None:
 if not link:
 return None
 return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
+
+
[email protected]
+def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> 
int:
+raise NotImplementedError(f"Not implemented for {type(task)}")
+
+
+# https://github.com/python/cpython/issues/86153
+# While we support Python 3.9 we can't rely on the type hint, we need to pass 
the type explicitly to
+# register.
+@get_mapped_ti_count.register(SerializedBaseOperator)
+@get_mapped_ti_count.register(TaskSDKBaseOperator)  # Some tests don't go 
through task serialization...
+def _(task: SerializedBaseOperator | TaskSDKBaseOperator, run_id: str, *, 
session: Session) -> int:
+group = task.get_closest_mapped_task_group()
+if group is None:
+raise NotMapped()
+return get_mapped_ti_count(group, run_id, session=session)
+
+
+@get_mapped_ti_count.register(MappedOperator)
+@get_mapped_ti_count.register(TaskSDKMappedOperator)  # Some tests don't go 
through task serialization...
+def _(task: MappedOperator | TaskSDKMappedOperator, run_id: str, *, session: 
Session) -> int:
+from airflow.serialization.serialized_objects import BaseSerialization, 
_ExpandInputRef
+
+exp_input = task._get_specified_expand_input()
+if isinstance(exp_input, _ExpandInputRef):
+exp_input = exp_input.deref(task.dag)
+# TODO: TaskSDK This is only needed to support `dag.test()` etc until we 
port it over to use the
+# task sdk runner.
+if not hasattr(exp_input, "get_total_map_length"):

Review Comment:
   If it isn't just moving the code: fine to do it in separate PR too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2190957504


##
airflow-core/src/airflow/models/mappedoperator.py:
##
@@ -173,3 +151,88 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> 
str | None:
 if not link:
 return None
 return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
+
+
[email protected]
+def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> 
int:
+raise NotImplementedError(f"Not implemented for {type(task)}")
+
+
+# https://github.com/python/cpython/issues/86153
+# While we support Python 3.9 we can't rely on the type hint, we need to pass 
the type explicitly to
+# register.
+@get_mapped_ti_count.register(SerializedBaseOperator)
+@get_mapped_ti_count.register(TaskSDKBaseOperator)  # Some tests don't go 
through task serialization...
+def _(task: SerializedBaseOperator | TaskSDKBaseOperator, run_id: str, *, 
session: Session) -> int:
+group = task.get_closest_mapped_task_group()
+if group is None:
+raise NotMapped()
+return get_mapped_ti_count(group, run_id, session=session)
+
+
+@get_mapped_ti_count.register(MappedOperator)
+@get_mapped_ti_count.register(TaskSDKMappedOperator)  # Some tests don't go 
through task serialization...
+def _(task: MappedOperator | TaskSDKMappedOperator, run_id: str, *, session: 
Session) -> int:
+from airflow.serialization.serialized_objects import BaseSerialization, 
_ExpandInputRef
+
+exp_input = task._get_specified_expand_input()
+if isinstance(exp_input, _ExpandInputRef):
+exp_input = exp_input.deref(task.dag)
+# TODO: TaskSDK This is only needed to support `dag.test()` etc until we 
port it over to use the
+# task sdk runner.
+if not hasattr(exp_input, "get_total_map_length"):

Review Comment:
   Does this mean we can get rid of it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-07 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2190955862


##
airflow-core/src/airflow/models/mappedoperator.py:
##
@@ -173,3 +151,88 @@ def get_extra_links(self, ti: TaskInstance, name: str) -> 
str | None:
 if not link:
 return None
 return link.get_link(self, ti_key=ti.key)  # type: ignore[arg-type]
+
+
[email protected]
+def get_mapped_ti_count(task: DAGNode, run_id: str, *, session: Session) -> 
int:
+raise NotImplementedError(f"Not implemented for {type(task)}")
+
+
+# https://github.com/python/cpython/issues/86153
+# While we support Python 3.9 we can't rely on the type hint, we need to pass 
the type explicitly to

Review Comment:
   We dropped support for Py 3.9 on main recently



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-06 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2188571740


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,22 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)

Review Comment:
   Looks like `dry_run` is still used in a handful of tests. It’s also used by 
`TI.dry_run()` (probably should also get rid of that).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-06 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2188571740


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,22 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)

Review Comment:
   Looks like `dry_run` is still used in a handful of tests. It’s also used by 
`TI.dry_run()` (probably should also get rid of that).
   
   Also it’s somehow called in `SFTPToWasbOperator`’s `__init__`! Which means 
we’d break every use of that operator if we remove the function outright. What 
the f.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-06 Thread via GitHub


uranusjr commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2188569133


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,22 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)

Review Comment:
   That’s just copied from ancient code. Renaming and maybe changing the 
presentation should be a separate conversation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-06 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2188537966


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,22 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)

Review Comment:
   Does this render though? Looks like just logs on the `template_fields` 
as-is. Rendering would require Context dict



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-04 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2186076518


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,216 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)
+self.log.info(content)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_push(
+context: Any,
+key: str,
+value: Any,
+) -> None:
+"""
+Make an XCom available for tasks to pull.
+
+:param context: Execution Context Dictionary
+:param key: A key for the XCom
+:param value: A value for the XCom. The value is pickled and stored
+in the database.
+"""
+context["ti"].xcom_push(key=key, value=value)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_pull(
+context: Any,
+task_ids: str | list[str] | None = None,
+dag_id: str | None = None,
+key: str = "return_value",
+include_prior_dates: bool | None = None,
+session=None,
+) -> Any:
+"""
+Pull XComs that optionally meet certain criteria.
+
+The default value for `key` limits the search to XComs
+that were returned by other tasks (as opposed to those that were pushed
+manually). To remove this filter, pass key=None (or any desired value).
+
+If a single task_id string is provided, the result is the value of the
+most recent matching XCom from that task_id. If multiple task_ids are
+provided, a tuple of matching values is returned. None is returned
+whenever no matches are found.
+
+:param context: Execution Context Dictionary
+:param key: A key for the XCom. If provided, only XComs with matching
+keys will be returned. The default key is 'return_value', also
+available as a constant XCOM_RETURN_KEY. This key is automatically
+given to XComs returned by tasks (as opposed to being pushed
+manually). To remove the filter, pass key=None.
+:param task_ids: Only XComs from tasks with matching ids will be
+pulled. Can pass None to remove the filter.
+:param dag_id: If provided, only pulls XComs from this DAG.
+If None (default), the DAG of the calling task is used.
+:param include_prior_dates: If False, only XComs from the current
+logical_date are returned. If True, XComs from previous dates
+are returned as well.
+"""
+from airflow.settings import Session
+
+if session is None:
+session = Session()
+return context["ti"].xcom_pull(
+key=key,
+task_ids=task_ids,
+dag_id=dag_id,
+include_prior_dates=include_prior_dates,
+session=session,
+)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+def run(
+self,
+start_date: datetime | None = None,
+end_date: datetime | None = None,
+ignore_first_depends_on_past: bool = True,
+wait_for_past_depends_before_skipping: bool = False,
+ignore_ti_state: bool = False,
+mark_success: bool = False,
+test_mode: bool = False,
+session=None,
+) -> None:
+"""Run a set of task instances for a date range."""
+import pendulum
+from sqlalchemy import select
+from sqlalchemy.exc import NoResultFound
+
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.settings import Session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
+
+if session is None:
+session = Session()
+
+# Assertions for typing -- we need a dag, for this function, and when 
we have a DAG we are
+# _guaranteed_ to have start_date (else we couldn't have been added to 
a DAG)
+if TYPE_CHECKING:
+   

Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-04 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2185923336


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,216 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)
+self.log.info(content)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_push(
+context: Any,
+key: str,
+value: Any,
+) -> None:
+"""
+Make an XCom available for tasks to pull.
+
+:param context: Execution Context Dictionary
+:param key: A key for the XCom
+:param value: A value for the XCom. The value is pickled and stored
+in the database.
+"""
+context["ti"].xcom_push(key=key, value=value)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_pull(
+context: Any,
+task_ids: str | list[str] | None = None,
+dag_id: str | None = None,
+key: str = "return_value",
+include_prior_dates: bool | None = None,
+session=None,
+) -> Any:
+"""
+Pull XComs that optionally meet certain criteria.
+
+The default value for `key` limits the search to XComs
+that were returned by other tasks (as opposed to those that were pushed
+manually). To remove this filter, pass key=None (or any desired value).
+
+If a single task_id string is provided, the result is the value of the
+most recent matching XCom from that task_id. If multiple task_ids are
+provided, a tuple of matching values is returned. None is returned
+whenever no matches are found.
+
+:param context: Execution Context Dictionary
+:param key: A key for the XCom. If provided, only XComs with matching
+keys will be returned. The default key is 'return_value', also
+available as a constant XCOM_RETURN_KEY. This key is automatically
+given to XComs returned by tasks (as opposed to being pushed
+manually). To remove the filter, pass key=None.
+:param task_ids: Only XComs from tasks with matching ids will be
+pulled. Can pass None to remove the filter.
+:param dag_id: If provided, only pulls XComs from this DAG.
+If None (default), the DAG of the calling task is used.
+:param include_prior_dates: If False, only XComs from the current
+logical_date are returned. If True, XComs from previous dates
+are returned as well.
+"""
+from airflow.settings import Session
+
+if session is None:
+session = Session()
+return context["ti"].xcom_pull(
+key=key,
+task_ids=task_ids,
+dag_id=dag_id,
+include_prior_dates=include_prior_dates,
+session=session,
+)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+def run(
+self,
+start_date: datetime | None = None,
+end_date: datetime | None = None,
+ignore_first_depends_on_past: bool = True,
+wait_for_past_depends_before_skipping: bool = False,
+ignore_ti_state: bool = False,
+mark_success: bool = False,
+test_mode: bool = False,
+session=None,
+) -> None:
+"""Run a set of task instances for a date range."""
+import pendulum
+from sqlalchemy import select
+from sqlalchemy.exc import NoResultFound
+
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.settings import Session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
+
+if session is None:
+session = Session()
+
+# Assertions for typing -- we need a dag, for this function, and when 
we have a DAG we are
+# _guaranteed_ to have start_date (else we couldn't have been added to 
a DAG)
+if TYPE_CHECKING:
+   

Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-04 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2185918897


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,216 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)
+self.log.info(content)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_push(
+context: Any,
+key: str,
+value: Any,
+) -> None:
+"""
+Make an XCom available for tasks to pull.
+
+:param context: Execution Context Dictionary
+:param key: A key for the XCom
+:param value: A value for the XCom. The value is pickled and stored
+in the database.
+"""
+context["ti"].xcom_push(key=key, value=value)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_pull(
+context: Any,
+task_ids: str | list[str] | None = None,
+dag_id: str | None = None,
+key: str = "return_value",
+include_prior_dates: bool | None = None,
+session=None,
+) -> Any:
+"""
+Pull XComs that optionally meet certain criteria.
+
+The default value for `key` limits the search to XComs
+that were returned by other tasks (as opposed to those that were pushed
+manually). To remove this filter, pass key=None (or any desired value).
+
+If a single task_id string is provided, the result is the value of the
+most recent matching XCom from that task_id. If multiple task_ids are
+provided, a tuple of matching values is returned. None is returned
+whenever no matches are found.
+
+:param context: Execution Context Dictionary
+:param key: A key for the XCom. If provided, only XComs with matching
+keys will be returned. The default key is 'return_value', also
+available as a constant XCOM_RETURN_KEY. This key is automatically
+given to XComs returned by tasks (as opposed to being pushed
+manually). To remove the filter, pass key=None.
+:param task_ids: Only XComs from tasks with matching ids will be
+pulled. Can pass None to remove the filter.
+:param dag_id: If provided, only pulls XComs from this DAG.
+If None (default), the DAG of the calling task is used.
+:param include_prior_dates: If False, only XComs from the current
+logical_date are returned. If True, XComs from previous dates
+are returned as well.
+"""
+from airflow.settings import Session
+
+if session is None:
+session = Session()
+return context["ti"].xcom_pull(
+key=key,
+task_ids=task_ids,
+dag_id=dag_id,
+include_prior_dates=include_prior_dates,
+session=session,
+)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+def run(
+self,
+start_date: datetime | None = None,
+end_date: datetime | None = None,
+ignore_first_depends_on_past: bool = True,
+wait_for_past_depends_before_skipping: bool = False,
+ignore_ti_state: bool = False,
+mark_success: bool = False,
+test_mode: bool = False,
+session=None,
+) -> None:
+"""Run a set of task instances for a date range."""
+import pendulum
+from sqlalchemy import select
+from sqlalchemy.exc import NoResultFound
+
+from airflow.models.dagrun import DagRun
+from airflow.models.taskinstance import TaskInstance
+from airflow.settings import Session
+from airflow.utils.state import DagRunState
+from airflow.utils.types import DagRunTriggeredByType, DagRunType
+
+if session is None:
+session = Session()
+
+# Assertions for typing -- we need a dag, for this function, and when 
we have a DAG we are
+# _guaranteed_ to have start_date (else we couldn't have been added to 
a DAG)
+if TYPE_CHECKING:
+   

Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-07-04 Thread via GitHub


kaxil commented on code in PR #52234:
URL: https://github.com/apache/airflow/pull/52234#discussion_r2185918340


##
task-sdk/src/airflow/sdk/bases/operator.py:
##
@@ -1607,6 +1615,216 @@ def resume_execution(self, next_method: str, 
next_kwargs: dict[str, Any] | None,
 execute_callable = getattr(self, next_method)
 return execute_callable(context, **next_kwargs)
 
+def dry_run(self) -> None:
+"""Perform dry run for the operator - just render template fields."""
+self.log.info("Dry run")
+for f in self.template_fields:
+try:
+content = getattr(self, f)
+except AttributeError:
+raise AttributeError(
+f"{f!r} is configured as a template field "
+f"but {self.task_type} does not have this attribute."
+)
+
+if content and isinstance(content, str):
+self.log.info("Rendering template for %s", f)
+self.log.info(content)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_push(
+context: Any,
+key: str,
+value: Any,
+) -> None:
+"""
+Make an XCom available for tasks to pull.
+
+:param context: Execution Context Dictionary
+:param key: A key for the XCom
+:param value: A value for the XCom. The value is pickled and stored
+in the database.
+"""
+context["ti"].xcom_push(key=key, value=value)
+
+# TODO (GH-52141): Either port this, or somehow fix the tests to remove 
this from the sdk.
+@staticmethod
+def xcom_pull(
+context: Any,
+task_ids: str | list[str] | None = None,
+dag_id: str | None = None,
+key: str = "return_value",
+include_prior_dates: bool | None = None,
+session=None,
+) -> Any:

Review Comment:
   This shouldn't be needed anymore after 
https://github.com/apache/airflow/issues/52378
   
   Atleast not in Providers -- Need to check if `airflow-core/tests` use it -- 
but should be easy.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



Re: [PR] Remove direct scheduler BaseOperator refs [airflow]

2025-06-29 Thread via GitHub


uranusjr closed pull request #52234: Remove direct scheduler BaseOperator refs
URL: https://github.com/apache/airflow/pull/52234


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]