Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh merged PR #45924:
URL: https://github.com/apache/airflow/pull/45924


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1928101314


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+asset_type = type(obj).__name__
+if isinstance(obj, Asset):
+task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetAlias):
+task_outlets.append(AssetProfile(asset_type=asset_type))
+for asset_alias_event in events[obj].asset_alias_events:
+outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   Yeah, kept it there itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1928101426


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -243,6 +244,17 @@ def ti_update_state(
 else:
 updated_state = State.FAILED
 query = query.values(state=updated_state)
+elif isinstance(ti_patch_payload, TISuccessStatePayload):
+query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+updated_state = ti_patch_payload.state
+task_instance = session.get(TI, ti_id_str)
+TI._register_asset_changes_int(

Review Comment:
   This has been fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1928101055


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,40 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+asset_type = ""
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+if isinstance(obj, Asset):
+task_outlets.append(AssetNameAndUri(name=obj.name, 
uri=obj.uri))
+outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetNameAndUri(name=obj.name))
+# send all as we do not know how to filter here yet
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetNameAndUri(uri=obj.uri))
+# send all as we do not know how to filter here yet
+outlet_events.append(attrs.asdict(events))  # type: ignore

Review Comment:
   Issue for tracking https://github.com/apache/airflow/issues/46002



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1928101216


##
airflow/api_fastapi/execution_api/datamodels/asset.py:
##
@@ -34,3 +34,18 @@ class AssetAliasResponse(BaseModel):
 
 name: str
 group: str
+
+
+class AssetProfile(BaseModel):
+"""
+Profile of an Asset.
+
+Asset will have name, uri and asset_type defined.
+AssetNameRef will have name and asset_type defined.
+AssetUriRef will have uri and asset_type defined.
+
+"""
+
+name: str | None = None
+uri: str | None = None
+asset_type: str

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on PR #45924:
URL: https://github.com/apache/airflow/pull/45924#issuecomment-2611619373

   I will just resolve the conversations with relevant replies, rebase & merge 
this


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1927348708


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+asset_type = type(obj).__name__
+if isinstance(obj, Asset):
+task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetAlias):
+task_outlets.append(AssetProfile(asset_type=asset_type))
+for asset_alias_event in events[obj].asset_alias_events:
+outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   It can be in this file I think -- it just makes it easier to understand what 
it does if it's got a name 😄 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926962938


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -43,6 +44,7 @@
 SetRenderedFields,
 SetXCom,
 StartupDetails,
+SucceedTask,

Review Comment:
   Oh yeah, `SucceedTask` is better then.



##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -43,6 +44,7 @@
 SetRenderedFields,
 SetXCom,
 StartupDetails,
+SucceedTask,

Review Comment:
   Leave them as you have them



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926828616


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+asset_type = type(obj).__name__
+if isinstance(obj, Asset):
+task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetAlias):
+task_outlets.append(AssetProfile(asset_type=asset_type))
+for asset_alias_event in events[obj].asset_alias_events:
+outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   Yeah let me do that. I have similar logic in task runner and _run_raw_task 
too. 
   
   Wondering if I can move it to lets say `definitions/assets/utils.py`? 
Definitions doesn't look to be a good place to me honestly, but I cant think of 
some other location. Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926828616


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+asset_type = type(obj).__name__
+if isinstance(obj, Asset):
+task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetAlias):
+task_outlets.append(AssetProfile(asset_type=asset_type))
+for asset_alias_event in events[obj].asset_alias_events:
+outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   Yeah let me do that. I have similar logic in task runner too. 
   
   Wondering if I can move it to lets say `definitions/assets/utils.py`? 
Definitions doesn't look to be a good place to me honestly, but I cant think of 
some other location. Any suggestions?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926816976


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   This discussion is outdated, I have made an effort to unify it instead of 
maintaining two versions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926822464


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -43,6 +44,7 @@
 SetRenderedFields,
 SetXCom,
 StartupDetails,
+SucceedTask,

Review Comment:
   I was aligning with `DeferTask`, `RescheduleTask` etc. I am OK making this 
change but should we do for remaining states too?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926821670


##
task_sdk/src/airflow/sdk/api/client.py:
##
@@ -136,6 +137,11 @@ def finish(self, id: uuid.UUID, state: TerminalTIState, 
when: datetime):
 body = TITerminalStatePayload(end_date=when, 
state=TerminalTIState(state))
 self.client.patch(f"task-instances/{id}/state", 
content=body.model_dump_json())
 
+def succeed(self, id: uuid.UUID, when: datetime, task_outlets, 
outlet_events):
+"""Tell the API server that this TI has to succeed."""

Review Comment:
   Right, it has succeeded! Not going to succeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926820042


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -243,6 +244,17 @@ def ti_update_state(
 else:
 updated_state = State.FAILED
 query = query.values(state=updated_state)
+elif isinstance(ti_patch_payload, TISuccessStatePayload):
+query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+updated_state = ti_patch_payload.state
+task_instance = session.get(TI, ti_id_str)
+TI._register_asset_changes_int(

Review Comment:
   That's right, let me make the changes for that.
   
   I thought `int` meant `in table` lol!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926818832


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -54,12 +63,39 @@ class TIEnterRunningPayload(BaseModel):
 class TITerminalStatePayload(BaseModel):
 """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or 
FAILED)."""

Review Comment:
   Oh yes, made the changes for that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926659629


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -243,6 +244,17 @@ def ti_update_state(
 else:
 updated_state = State.FAILED
 query = query.values(state=updated_state)
+elif isinstance(ti_patch_payload, TISuccessStatePayload):
+query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+updated_state = ti_patch_payload.state
+task_instance = session.get(TI, ti_id_str)
+TI._register_asset_changes_int(

Review Comment:
   Oh and the `int` suffix! That means `internal` but it's not internal anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926655219


##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -43,6 +44,7 @@
 SetRenderedFields,
 SetXCom,
 StartupDetails,
+SucceedTask,

Review Comment:
   Lets be a bit more consistent with `TaskState` here:
   ```suggestion
   TaskSuccess,
   ```



##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -243,6 +244,17 @@ def ti_update_state(
 else:
 updated_state = State.FAILED
 query = query.values(state=updated_state)
+elif isinstance(ti_patch_payload, TISuccessStatePayload):
+query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+updated_state = ti_patch_payload.state
+task_instance = session.get(TI, ti_id_str)
+TI._register_asset_changes_int(

Review Comment:
   Nit/not even quite a nit: If this method is meant to be called from things 
outside the TI class then it shouldn't be prefixed with `_`



##
task_sdk/src/airflow/sdk/api/client.py:
##
@@ -136,6 +137,11 @@ def finish(self, id: uuid.UUID, state: TerminalTIState, 
when: datetime):
 body = TITerminalStatePayload(end_date=when, 
state=TerminalTIState(state))
 self.client.patch(f"task-instances/{id}/state", 
content=body.model_dump_json())
 
+def succeed(self, id: uuid.UUID, when: datetime, task_outlets, 
outlet_events):
+"""Tell the API server that this TI has to succeed."""

Review Comment:
   ```suggestion
   """Tell the API server that this TI has succeeded."""
   ```



##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+asset_type = type(obj).__name__
+if isinstance(obj, Asset):
+task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetAlias):
+task_outlets.append(AssetProfile(asset_type=asset_type))
+for asset_alias_event in events[obj].asset_alias_events:
+outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   For clarity lets pull this all out into a func called something like 
`process_assets` or `process_outlets`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on PR #45924:
URL: https://github.com/apache/airflow/pull/45924#issuecomment-2609303044

   Ok I think I figured out the reason for failure, working on a fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926645568


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -54,12 +63,39 @@ class TIEnterRunningPayload(BaseModel):
 class TITerminalStatePayload(BaseModel):
 """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or 
FAILED)."""

Review Comment:
   Comment is now wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on PR #45924:
URL: https://github.com/apache/airflow/pull/45924#issuecomment-2609282772

   Legacy without my changes:
   ```
   2f107d64f771
â–¶ Log message source details
   [2025-01-23, 09:14:33 UTC] {local_task_job_runner.py:120} â–¶ Pre task 
execution logs
   [2025-01-23, 09:14:33 UTC] {logging_mixin.py:212} INFO - AssetEvent(id=2, 
asset_id=1, extra={}, 
source_task_id='produce_asset_events_through_asset_alias', 
source_dag_id='asset_alias_example_alias_producer', 
source_run_id='manual__2025-01-23T09:14:17.503997+00:00', source_map_index=-1, 
source_aliases=[AssetAliasModel(name='example-alias')])
   [2025-01-23, 09:14:33 UTC] {python.py:198} INFO - Done. Returned value was: 
None
   [2025-01-23, 09:14:33 UTC] {taskinstance.py:331} â–¶ Post task execution logs
   ```
   
   
   Legacy with my changes:
   ```
   ae19b24f9ec4
â–¶ Log message source details
   [2025-01-23, 09:18:54 UTC] {local_task_job_runner.py:120} â–¶ Pre task 
execution logs
   [2025-01-23, 09:18:54 UTC] {logging_mixin.py:212} INFO - AssetEvent(id=2, 
asset_id=1, extra={}, 
source_task_id='produce_asset_events_through_asset_alias', 
source_dag_id='asset_alias_example_alias_producer', 
source_run_id='manual__2025-01-23T09:18:36.263671+00:00', source_map_index=-1, 
source_aliases=[AssetAliasModel(name='example-alias')])
   [2025-01-23, 09:18:54 UTC] {python.py:198} INFO - Done. Returned value was: 
None
   [2025-01-23, 09:18:54 UTC] {taskinstance.py:332} â–¶ Post task execution logs
   ```
   
   Which is the same, so likely its a change needed in the test cases.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-23 Thread via GitHub


amoghrajesh commented on PR #45924:
URL: https://github.com/apache/airflow/pull/45924#issuecomment-2609212969

   Looks like the changes in this PR break some test cases for inlet_events.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on PR #45924:
URL: https://github.com/apache/airflow/pull/45924#issuecomment-2609087454

   With the new changes, tested both for legacy and task sdk DAGs
   
   Legacy Results:
   
![image](https://github.com/user-attachments/assets/78fbd716-b512-4f74-9d1c-36ef8b183b73)
   
   
   Task SDK Results:
   (asset_s3_bucket_producer first 2 failures are unrelated and the failure for 
asset_alias_example_alias_consumer is because of inlet_events not yet woirking)
   
![image](https://github.com/user-attachments/assets/5f5b5178-ba07-4566-b1a4-2fe91690dec8)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1926463681


##
airflow/api_fastapi/execution_api/datamodels/asset.py:
##
@@ -34,3 +34,18 @@ class AssetAliasResponse(BaseModel):
 
 name: str
 group: str
+
+
+class AssetProfile(BaseModel):
+"""
+Profile of an Asset.
+
+Asset will have name, uri and asset_type defined.
+AssetNameRef will have name and asset_type defined.
+AssetUriRef will have uri and asset_type defined.
+
+"""
+
+name: str | None = None
+uri: str | None = None
+asset_type: str

Review Comment:
   I realised there was a bug in my earlier code where i was defining 
"asset_type" at the payload level. The issue with that is if you have more than 
1 outlet passed, it will always take the "asset_type" of the last outlet and 
not register events for the past ones.



##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,39 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+asset_type = type(obj).__name__
+if isinstance(obj, Asset):
+task_outlets.append(AssetProfile(name=obj.name, uri=obj.uri, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events[obj]))  # type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+# Send all events, filtering can be done in API server.
+outlet_events.append(attrs.asdict(events))  # type: ignore
+elif isinstance(obj, AssetAlias):
+task_outlets.append(AssetProfile(asset_type=asset_type))
+for asset_alias_event in events[obj].asset_alias_events:
+outlet_events.append(attrs.asdict(asset_alias_event))

Review Comment:
   Each of these branches have asset types defined even if name and uri are 
optional.



##
airflow/models/taskinstance.py:
##
@@ -352,7 +353,26 @@ def _run_raw_task(
 if not test_mode:
 _add_log(event=ti.state, task_instance=ti, session=session)
 if ti.state == TaskInstanceState.SUCCESS:
-ti._register_asset_changes(events=context["outlet_events"], 
session=session)
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+asset_type = type(obj).__name__
+if isinstance(obj, Asset):
+task_outlets.append(AssetProfile(name=obj.name, 
uri=obj.uri, asset_type=asset_type))
+outlet_events.append(attrs.asdict(events[obj]))  # 
type: ignore
+elif isinstance(obj, AssetNameRef):
+task_outlets.append(AssetProfile(name=obj.name, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events))  # type: 
ignore
+elif isinstance(obj, AssetUriRef):
+task_outlets.append(AssetProfile(uri=obj.uri, 
asset_type=asset_type))
+outlet_events.append(attrs.asdict(events))  # type: 
ignore
+elif isinstance(obj, AssetAlias):
+
task_outlets.append(AssetProfile(asset_type=asset_type))
+for asset_alias_event in 
events[obj].asset_alias_events:
+
outlet_events.append(attrs.asdict(asset_alias_event))
+TaskInstance._register_asset_changes_int(ti, task_outlets, 
outlet_events, session=session)

Review Comment:
   This makes nuking 
https://github.com/apache/airflow/pull/45924/files#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487L2736-L2742
 possible. Not a big fan that we have to iterate here to calculate types and 
populate:
   ```
   task_outlets = []
   outlet_events = []
   ```
   
   And iterate again here: 
   
https://github.com/apache/airflow/pull/45924/files#diff-62f7d8a52fefdb8e05d4f040c6d3459b4a56fe46976c24f68843dbaeb5a98487R2781-R2795
   
   But should be ok for now
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use

Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925425251


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   I would really like that too, but it is significantly tricky because of few 
things. 
   
   In the API payload, we have the following details:
   1. The task outlets in `[name, uri]` form with the type of asset it is: 
`Asset, AssetAlias, AssetNameRef, AssetUriRef`.
   2. The events associated with the asset type: for assets we send: 
`events[obj]` and for asset aliases, we send: `events[obj].asset_alias_events` 
where events is `context["outlet_keys"]`. 
   
   This will require adding a lot of `if - else` checks around and better way 
to identify the `obj`, either isinstance or "==" here: 
https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L2759-L2776
   
   Myself and @kaxil were also discussing that the function 
`_register_asset_changes` is overly complicated and would benefit from some 
rewriting too.
   
   I can still give it a shot if you'd like me to.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925413204


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):
+# One task only triggers one asset event for each asset with the same 
extra.
+# This tuple[asset uri, extra] to sets alias names mapping is used to find 
whether
+# there're assets with same uri but different extra that we need to emit 
more than one asset events.
+asset_alias_names: dict[tuple[AssetUniqueKey, frozenset], set[str]] = 
defaultdict(set)
+asset_name_refs: set[str] = set()
+asset_uri_refs: set[str] = set()
+
+for obj in task_outlets:
+# Lineage can have other types of objects besides assets
+if asset_type == "Asset":
+asset_manager.register_asset_change(
+task_instance=task_instance,

Review Comment:
   Looks like for the time being we need those columns, would it be nice to 
tackle this in a follow up while doing: 
   
   > Also perhaps in future work we should make update Asset tables to use TI 
UUID to link instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925409594


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -54,12 +64,52 @@ class TIEnterRunningPayload(BaseModel):
 class TITerminalStatePayload(BaseModel):
 """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or 
FAILED)."""
 
-state: TerminalTIState
+state: Literal[
+TerminalTIState.FAILED,
+TerminalTIState.SKIPPED,
+TerminalTIState.REMOVED,
+TerminalTIState.FAIL_WITHOUT_RETRY,
+]
 
 end_date: UtcDateTime
 """When the task completed executing"""
 
 
+class TISuccessStatePayload(BaseModel):
+"""Schema for updating TaskInstance to success state."""
+
+state: Annotated[
+Literal[TerminalTIState.SUCCESS],
+# Specify a default in the schema, but not in code, so Pydantic marks 
it as required.
+WithJsonSchema(
+{
+"type": "string",
+"enum": [TerminalTIState.SUCCESS],
+"default": TerminalTIState.SUCCESS,
+}
+),
+]
+
+end_date: UtcDateTime
+"""When the task completed executing"""
+
+task_outlets: Annotated[list[AssetNameAndUri], Field(default_factory=list)]
+outlet_events: Annotated[list[Any], Field(default_factory=list)]
+asset_type: str | None = None
+
+@root_validator(pre=True)
+def parse_json_fields(cls, values):
+import json
+
+if "task_outlets" in values and isinstance(values["task_outlets"], 
str):
+values["task_outlets"] = json.loads(values["task_outlets"])
+
+if "outlet_events" in values and isinstance(values["outlet_events"], 
str):
+values["outlet_events"] = json.loads(values["outlet_events"])

Review Comment:
   Actually yeah we do not need this. Let me remove it.
   
   The reason it was added was because while testing I was getting the task 
logs from the "UI" logs tab, there it is a json string. Example:
   ```
   
{"json":"{\"state\":\"success\",\"end_date\":\"2025-01-22T12:29:35.298127Z\",\"task_outlets\":[{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"}],\"outlet_events\":[{\"key\":{\"name\":\"s3://bucket/my-task\",\"uri\":\"s3://bucket/my-task\"},\"extra\":{},\"asset_alias_events\":[]}],\"asset_type\":\"Asset\",\"type\":\"SucceedTask\"}\n","timestamp":"2025-01-22T12:29:35.298180","logger":"task","event":"Sending
 request","level":"debug"}
   ```
   
   
   That lead me to believe that it would come in that format but pydantic takes 
care of that. Tested it, and removing this piece.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925313387


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   We should ideally have only one version of this function. Can we call that 
one from in here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925234586


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   Yeah, it is a lot of code and is taken from 
https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359
 (I mentioned in the PR desc too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925239518


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -54,12 +64,52 @@ class TIEnterRunningPayload(BaseModel):
 class TITerminalStatePayload(BaseModel):
 """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or 
FAILED)."""
 
-state: TerminalTIState
+state: Literal[
+TerminalTIState.FAILED,
+TerminalTIState.SKIPPED,
+TerminalTIState.REMOVED,
+TerminalTIState.FAIL_WITHOUT_RETRY,
+]
 
 end_date: UtcDateTime
 """When the task completed executing"""
 
 
+class TISuccessStatePayload(BaseModel):
+"""Schema for updating TaskInstance to success state."""
+
+state: Annotated[
+Literal[TerminalTIState.SUCCESS],
+# Specify a default in the schema, but not in code, so Pydantic marks 
it as required.
+WithJsonSchema(
+{
+"type": "string",
+"enum": [TerminalTIState.SUCCESS],
+"default": TerminalTIState.SUCCESS,
+}
+),
+]
+
+end_date: UtcDateTime
+"""When the task completed executing"""
+
+task_outlets: Annotated[list[AssetNameAndUri], Field(default_factory=list)]
+outlet_events: Annotated[list[Any], Field(default_factory=list)]

Review Comment:
   We make do with empty lists right now. Its not needed to send these fields. 
Example in the test cases: `test_ti_update_state_to_terminal`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925234586


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   Yeah, it is a lot of code and is taken from 
https://github.com/apache/airflow/blob/051e617e0d7d0ebb995cb98063709350f279963c/airflow/models/taskinstance.py#L359.
 (I mentioned in the PR desc too)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925224982


##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -243,6 +252,19 @@ def ti_update_state(
 else:
 updated_state = State.FAILED
 query = query.values(state=updated_state)
+elif isinstance(ti_patch_payload, TISuccessStatePayload):
+query = TI.duration_expression_update(ti_patch_payload.end_date, 
query, session.bind)
+updated_state = ti_patch_payload.state
+task_instance = session.get(TI, ti_id_str)

Review Comment:
   Do we "need" to get the TI? 



##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   This function feels out of place in the routes file, and would be better 
placed in airflow.models.assets I think.
   
   Or perhaps on the asset manager.



##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):

Review Comment:
   This also feels like it's a lot of code that should exist somewhere else. 
Was it copied? Did you write it all from scratch just for this PR?



##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):
+# One task only triggers one asset event for each asset with the same 
extra.
+# This tuple[asset uri, extra] to sets alias names mapping is used to find 
whether
+# there're assets with same uri but different extra that we need to emit 
more than one asset events.
+asset_alias_names: dict[tuple[AssetUniqueKey, frozenset], set[str]] = 
defaultdict(set)
+asset_name_refs: set[str] = set()
+asset_uri_refs: set[str] = set()
+
+for obj in task_outlets:
+# Lineage can have other types of objects besides assets
+if asset_type == "Asset":
+asset_manager.register_asset_change(
+task_instance=task_instance,

Review Comment:
   We are only passing a TaskInstance object here for the following use cases:
   
   ```
   if task_instance:
   event_kwargs.update(
   source_task_id=task_instance.task_id,
   source_dag_id=task_instance.dag_id,
   source_run_id=task_instance.run_id,
   source_map_index=task_instance.map_index,
   )
   ```
   
   We don't need a full TI, a TIKey would be enough for that if it makes things 
easier -i.e. so we don't need to get the full TI from the DB. But if we need to 
select those columns anyway then it doesn't make much difference.
   
   Also perhaps in future work we should make update Asset tables to use TI 
UUID to link instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925221925


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -54,12 +64,52 @@ class TIEnterRunningPayload(BaseModel):
 class TITerminalStatePayload(BaseModel):
 """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or 
FAILED)."""
 
-state: TerminalTIState
+state: Literal[
+TerminalTIState.FAILED,
+TerminalTIState.SKIPPED,
+TerminalTIState.REMOVED,
+TerminalTIState.FAIL_WITHOUT_RETRY,
+]
 
 end_date: UtcDateTime
 """When the task completed executing"""
 
 
+class TISuccessStatePayload(BaseModel):
+"""Schema for updating TaskInstance to success state."""
+
+state: Annotated[
+Literal[TerminalTIState.SUCCESS],
+# Specify a default in the schema, but not in code, so Pydantic marks 
it as required.
+WithJsonSchema(
+{
+"type": "string",
+"enum": [TerminalTIState.SUCCESS],
+"default": TerminalTIState.SUCCESS,
+}
+),
+]
+
+end_date: UtcDateTime
+"""When the task completed executing"""
+
+task_outlets: Annotated[list[AssetNameAndUri], Field(default_factory=list)]
+outlet_events: Annotated[list[Any], Field(default_factory=list)]
+asset_type: str | None = None
+
+@root_validator(pre=True)
+def parse_json_fields(cls, values):
+import json
+
+if "task_outlets" in values and isinstance(values["task_outlets"], 
str):
+values["task_outlets"] = json.loads(values["task_outlets"])
+
+if "outlet_events" in values and isinstance(values["outlet_events"], 
str):
+values["outlet_events"] = json.loads(values["outlet_events"])

Review Comment:
   This seems _highly_ suspect. Why do we have to do any json parsing ourselves?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925216358


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -54,12 +64,52 @@ class TIEnterRunningPayload(BaseModel):
 class TITerminalStatePayload(BaseModel):
 """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or 
FAILED)."""
 
-state: TerminalTIState
+state: Literal[
+TerminalTIState.FAILED,
+TerminalTIState.SKIPPED,
+TerminalTIState.REMOVED,
+TerminalTIState.FAIL_WITHOUT_RETRY,
+]
 
 end_date: UtcDateTime
 """When the task completed executing"""
 
 
+class TISuccessStatePayload(BaseModel):
+"""Schema for updating TaskInstance to success state."""
+
+state: Annotated[
+Literal[TerminalTIState.SUCCESS],
+# Specify a default in the schema, but not in code, so Pydantic marks 
it as required.
+WithJsonSchema(
+{
+"type": "string",
+"enum": [TerminalTIState.SUCCESS],
+"default": TerminalTIState.SUCCESS,
+}
+),
+]
+
+end_date: UtcDateTime
+"""When the task completed executing"""
+
+task_outlets: Annotated[list[AssetNameAndUri], Field(default_factory=list)]
+outlet_events: Annotated[list[Any], Field(default_factory=list)]
+asset_type: str | None = None
+
+@root_validator(pre=True)
+def parse_json_fields(cls, values):
+import json
+
+if "task_outlets" in values and isinstance(values["task_outlets"], 
str):
+values["task_outlets"] = json.loads(values["task_outlets"])
+
+if "outlet_events" in values and isinstance(values["outlet_events"], 
str):
+values["outlet_events"] = json.loads(values["outlet_events"])
+
+return values

Review Comment:
   This is done so that the client and server can communicate in json string 
format.



##
tests/api_fastapi/execution_api/routes/test_task_instances.py:
##
@@ -322,6 +415,7 @@ def test_ti_update_state_database_error(self, client, 
session, create_task_insta
 "airflow.api_fastapi.common.db.common.Session.execute",
 side_effect=[
 mock.Mock(one=lambda: ("running", 1, 0)),  # First call 
returns "queued"
+mock.Mock(one=lambda: ("running", 1, 0)),  # Second call 
returns "queued"

Review Comment:
   success state adds another query to "ti" table. Hence needed.



##
airflow/api_fastapi/execution_api/routes/task_instances.py:
##
@@ -440,3 +462,103 @@ def _is_eligible_to_retry(state: str, try_number: int, 
max_tries: int) -> bool:
 # max_tries is initialised with the retries defined at task level, we do 
not need to explicitly ask for
 # retries from the task SDK now, we can handle using max_tries
 return max_tries != 0 and try_number <= max_tries
+
+
+def register_asset_changes(task_instance, task_outlets, outlet_events, 
asset_type, session):
+# One task only triggers one asset event for each asset with the same 
extra.
+# This tuple[asset uri, extra] to sets alias names mapping is used to find 
whether
+# there're assets with same uri but different extra that we need to emit 
more than one asset events.
+asset_alias_names: dict[tuple[AssetUniqueKey, frozenset], set[str]] = 
defaultdict(set)
+asset_name_refs: set[str] = set()
+asset_uri_refs: set[str] = set()
+
+for obj in task_outlets:
+# Lineage can have other types of objects besides assets
+if asset_type == "Asset":
+asset_manager.register_asset_change(
+task_instance=task_instance,
+asset=Asset(name=obj.name, uri=obj.uri),
+extra=outlet_events,
+session=session,
+)
+elif asset_type == "AssetNameRef":
+asset_name_refs.add(obj.name)
+elif asset_type == "AssetUriRef":
+asset_uri_refs.add(obj.uri)
+
+if asset_type == "AssetAlias":
+# deserialize to the expected type
+outlet_events = list(
+map(
+lambda event: {**event, "dest_asset_key": 
AssetUniqueKey(**event["dest_asset_key"])},
+outlet_events,
+)
+)

Review Comment:
   Converting it to `AssetUniqueKey` format so that the below access can remain 
simple and in lines with legacy code. Otherwise Line 502 will complain that we 
cannot use mutable key for dictionary



##
task_sdk/src/airflow/sdk/execution_time/task_runner.py:
##
@@ -479,12 +481,40 @@ def run(ti: RuntimeTaskInstance, log: Logger):
 
 _push_xcom_if_needed(result, ti)
 
+task_outlets = []
+outlet_events = []
+events = context["outlet_events"]
+asset_type = ""
+
+for obj in ti.task.outlets or []:
+# Lineage can have other types of objects besides assets
+if isinstan

Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


ashb commented on code in PR #45924:
URL: https://github.com/apache/airflow/pull/45924#discussion_r1925220697


##
airflow/api_fastapi/execution_api/datamodels/taskinstance.py:
##
@@ -54,12 +64,52 @@ class TIEnterRunningPayload(BaseModel):
 class TITerminalStatePayload(BaseModel):
 """Schema for updating TaskInstance to a terminal state (e.g., SUCCESS or 
FAILED)."""
 
-state: TerminalTIState
+state: Literal[
+TerminalTIState.FAILED,
+TerminalTIState.SKIPPED,
+TerminalTIState.REMOVED,
+TerminalTIState.FAIL_WITHOUT_RETRY,
+]
 
 end_date: UtcDateTime
 """When the task completed executing"""
 
 
+class TISuccessStatePayload(BaseModel):
+"""Schema for updating TaskInstance to success state."""
+
+state: Annotated[
+Literal[TerminalTIState.SUCCESS],
+# Specify a default in the schema, but not in code, so Pydantic marks 
it as required.
+WithJsonSchema(
+{
+"type": "string",
+"enum": [TerminalTIState.SUCCESS],
+"default": TerminalTIState.SUCCESS,
+}
+),
+]
+
+end_date: UtcDateTime
+"""When the task completed executing"""
+
+task_outlets: Annotated[list[AssetNameAndUri], Field(default_factory=list)]
+outlet_events: Annotated[list[Any], Field(default_factory=list)]

Review Comment:
   Should these be `| None = None`? i.e. not sent if there's nothing needed?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] AIP-72: Port Registering of Asset Changes to Task SDK on task completion [airflow]

2025-01-22 Thread via GitHub


amoghrajesh commented on PR #45924:
URL: https://github.com/apache/airflow/pull/45924#issuecomment-2606992208

   Working on fixing the tests


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org