Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-09 Thread via GitHub


dstandish merged PR #38570:
URL: https://github.com/apache/airflow/pull/38570


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-09 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1557750374


##
airflow/serialization/serialized_objects.py:
##
@@ -538,9 +538,9 @@ def serialize(
 elif isinstance(var, Resources):
 return var.to_dict()
 elif isinstance(var, MappedOperator):
-return SerializedBaseOperator.serialize_mapped_operator(var)
+return 
cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)

Review Comment:
   I don't know what's at stake here? What's the alternative way?  If that' 
better, why not do it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-09 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1557750374


##
airflow/serialization/serialized_objects.py:
##
@@ -538,9 +538,9 @@ def serialize(
 elif isinstance(var, Resources):
 return var.to_dict()
 elif isinstance(var, MappedOperator):
-return SerializedBaseOperator.serialize_mapped_operator(var)
+return 
cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)

Review Comment:
   I don't know what's at stake here? What's the alternative way?  If that's 
better, why not do it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-08 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1556927388


##
airflow/serialization/pydantic/dag_run.py:
##
@@ -78,7 +78,7 @@ def get_task_instances(
 def get_task_instance(
 self,
 task_id: str,
-session: Session,
+session: Session | None = None,

Review Comment:
   i think this may be unnecessary.  will revert and see what happens.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-08 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1556922160


##
airflow/cli/commands/task_command.py:
##
@@ -156,8 +156,10 @@ def _get_dag_run(
 raise ValueError(f"unknown create_if_necessary value: 
{create_if_necessary!r}")
 
 
+@internal_api_call
 @provide_session
-def _get_ti(
+def _get_ti_db_access(
+dag: DAG,
 task: Operator,

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-08 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1556918483


##
airflow/serialization/serialized_objects.py:
##
@@ -1462,9 +1462,15 @@ def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> 
SerializedDAG:
 v = set(v)
 elif k == "tasks":
 SerializedBaseOperator._load_operator_extra_links = 
cls._load_operator_extra_links
-
-v = {task["task_id"]: 
SerializedBaseOperator.deserialize_operator(task) for task in v}
+tasks = {}
+for obj in v:
+if obj.get(Encoding.TYPE) == DAT.OP:
+deser = 
SerializedBaseOperator.deserialize_operator(obj[Encoding.VAR])
+tasks[deser.task_id] = deser
+else:  # this is backcompat for pre-2.10
+tasks[obj["task_id"]] = 
SerializedBaseOperator.deserialize_operator(obj)

Review Comment:
   for better or worse, default setting is to _not_ reserialize...
   
   
https://github.com/apache/airflow/blob/fecc1ed8eff5192818bbe04cbbdfe9585eaab583/airflow/cli/cli_config.py#L646-L653



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-08 Thread via GitHub


uranusjr commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r119108


##
airflow/serialization/pydantic/dag_run.py:
##
@@ -78,7 +78,7 @@ def get_task_instances(
 def get_task_instance(
 self,
 task_id: str,
-session: Session,
+session: Session | None = None,

Review Comment:
   ref https://github.com/apache/airflow/pull/38563#discussion_r1553938903
   
   I thought the same when seeing this change. Instead of (potentially?) adding 
`| None` everywhere, it may be better to introduce a fake session object to 
simplify typing and runtime behaviour. We can also use `typing.cast()` to fake 
the Session type so the imports cay stay unchanged.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-08 Thread via GitHub


uranusjr commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1555274634


##
airflow/serialization/serialized_objects.py:
##
@@ -1462,9 +1462,15 @@ def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> 
SerializedDAG:
 v = set(v)
 elif k == "tasks":
 SerializedBaseOperator._load_operator_extra_links = 
cls._load_operator_extra_links
-
-v = {task["task_id"]: 
SerializedBaseOperator.deserialize_operator(task) for task in v}
+tasks = {}
+for obj in v:
+if obj.get(Encoding.TYPE) == DAT.OP:
+deser = 
SerializedBaseOperator.deserialize_operator(obj[Encoding.VAR])
+tasks[deser.task_id] = deser
+else:  # this is backcompat for pre-2.10
+tasks[obj["task_id"]] = 
SerializedBaseOperator.deserialize_operator(obj)

Review Comment:
   Don’t we always require (?) reserialising when Airflow is upgraded? This 
wouldn’t be needed if that’s the case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-08 Thread via GitHub


uranusjr commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1555272673


##
airflow/cli/commands/task_command.py:
##
@@ -156,8 +156,10 @@ def _get_dag_run(
 raise ValueError(f"unknown create_if_necessary value: 
{create_if_necessary!r}")
 
 
+@internal_api_call
 @provide_session
-def _get_ti(
+def _get_ti_db_access(
+dag: DAG,
 task: Operator,

Review Comment:
   Maybe should add a check to guard misuses where the task is from a different 
DAG.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-07 Thread via GitHub


potiuk commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1555080840


##
airflow/serialization/serialized_objects.py:
##
@@ -538,9 +538,9 @@ def serialize(
 elif isinstance(var, Resources):
 return var.to_dict()
 elif isinstance(var, MappedOperator):
-return SerializedBaseOperator.serialize_mapped_operator(var)
+return 
cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)

Review Comment:
   It's fine for me. If we want to continue using airflow serds for all 
pydantic objects as well, this is the right thing to do. I **think** we don't - 
we should be able to rely purely on Pydantic serializing TaskInstance as 
TaskInstancePydantic. But yes - if we want to do it - it's fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-02 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1548192232


##
airflow/serialization/serialized_objects.py:
##
@@ -1462,9 +1462,15 @@ def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> 
SerializedDAG:
 v = set(v)
 elif k == "tasks":
 SerializedBaseOperator._load_operator_extra_links = 
cls._load_operator_extra_links
-
-v = {task["task_id"]: 
SerializedBaseOperator.deserialize_operator(task) for task in v}
+tasks = {}
+for obj in v:
+if obj.get(Encoding.TYPE) == DAT.OP:
+deser = 
SerializedBaseOperator.deserialize_operator(obj[Encoding.VAR])
+tasks[deser.task_id] = deser
+else:  # this is backcompat for pre-2.10
+tasks[obj["task_id"]] = 
SerializedBaseOperator.deserialize_operator(obj)

Review Comment:
   note also that if we merge this PR with these changes, then we can remove 
this logic in other pr : 
https://github.com/apache/airflow/pull/38567/files#diff-807ca0a4fd53aeb41166621c9842b0f89b7931fc64e9a60befa36c776db45efaR664



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-02 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1548191053


##
airflow/serialization/serialized_objects.py:
##
@@ -1462,9 +1462,15 @@ def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> 
SerializedDAG:
 v = set(v)
 elif k == "tasks":
 SerializedBaseOperator._load_operator_extra_links = 
cls._load_operator_extra_links
-
-v = {task["task_id"]: 
SerializedBaseOperator.deserialize_operator(task) for task in v}
+tasks = {}
+for obj in v:
+if obj.get(Encoding.TYPE) == DAT.OP:
+deser = 
SerializedBaseOperator.deserialize_operator(obj[Encoding.VAR])
+tasks[deser.task_id] = deser
+else:  # this is backcompat for pre-2.10
+tasks[obj["task_id"]] = 
SerializedBaseOperator.deserialize_operator(obj)

Review Comment:
   @potiuk @uranusjr  this is necessatated by the change to run ser op through 
`cls._encode` (like we already do for everything else) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] Make _get_ti compatible with RPC [airflow]

2024-04-02 Thread via GitHub


dstandish commented on code in PR #38570:
URL: https://github.com/apache/airflow/pull/38570#discussion_r1548190050


##
airflow/serialization/serialized_objects.py:
##
@@ -538,9 +538,9 @@ def serialize(
 elif isinstance(var, Resources):
 return var.to_dict()
 elif isinstance(var, MappedOperator):
-return SerializedBaseOperator.serialize_mapped_operator(var)
+return 
cls._encode(SerializedBaseOperator.serialize_mapped_operator(var), type_=DAT.OP)

Review Comment:
   @potiuk @uranusjr this PR needs a closer look.  what i'm doing is making it 
so that when we serialize an Operator, we actually run it through `cls._encode` 
so that on the other side we actually know what the object is.  we got away 
with *not* doing that up to now because `task` is always serialized as part of 
something else (e.g. TI or DAG) and custom serialization handles it.  but with 
AIP-44 there are times when we want to be able to serialize a task object 
directly and thus we need a way to do it. see below for backcompat code for 
deserializing old and new way.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org