The GitHub Actions job "Tests" on airflow.git has succeeded.
Run started by GitHub user kaxil (triggered by kaxil).

Head commit for run:
6e0dbbf31d909f25ad89563178a2f687bbebd045 / Kaxil Naik <[email protected]>
AIP-72: Add "update TI state" endpoint for Execution API

Part of https://github.com/apache/airflow/issues/43586

This PR adds a new endpoint `/execution/{task_instance_id}/state` that will 
allow Updating the State of the TI from the worker.

Some of the interesting changes / TILs were:

(hat tip to @ashb for this)

To streamline the data exchange between workers and the Task Execution API, 
this PR adds minified schemas for Task Instance updates i.e. focuses solely on 
the fields necessary for specific state transitions, reducing payload size and 
validations. Since our TaskInstance model is huge this also keeps it clean to 
focus on only those fields that matter for this case.

The endpoint added in this PR also leverages Pydantic’s [discriminated 
unions](https://docs.pydantic.dev/latest/concepts/unions/#discriminated-unions) 
to handle varying payload structures for each target state. This allows a 
single endpoint to receive different payloads (with different validations). For 
example:

- `TIEnterRunningPayload`: Requires fields such as hostname, unixname, pid, and 
start_date to mark a task as RUNNING.
- `TITerminalStatePayload`: Supports terminal states like SUCCESS, FAILED, 
SKIPPED,
- `TITargetStatePayload`: Allows for other non-terminal, non-running states 
that a task may transition to.

This is better so we don't have invalid payloads for example adding a 
start_date when a task is marked as SUCCESS, it doesn't make sense and it might 
be an error from the client!

![Nov-04-2024 
20-00-26](https://github.com/user-attachments/assets/07c1a197-0238-4c1a-9783-f23dd74a8d3e)

`fastapi` allows importing a handy `status` module from starlette which has 
status code and the reason in its name. Reference: 
https://fastapi.tiangolo.com/reference/status/
Example:

`status.HTTP_204_NO_CONTENT` and `status.HTTP_409_CONFLICT` explain a lot more 
than just a "204 code" which doesn't tell much. I plan to change our current 
integers on public API to these in coming days.

For now, I have assumed that we/the user don't care about `end_date` for 
`REMOVED` & `UPSTREAM_FAILED` status since they should be handled by the 
scheduler and shouldn't even show up on the worker. For `SKIPPED` state, since 
there are 2 scenarios: 1) A user can run the task and raise a 
`AirflowSkipException` 2) a task skipped on scheduler itself! For (1), we could 
set an end date, but (2) doesn't have it.

- [ ] Pass a [RFC 9457](https://datatracker.ietf.org/doc/html/rfc9457) 
compliant error message in "detail" field of `HTTPException` to provide more 
information about the error
- [ ] Add a separate heartbeat endpoint to track the TI’s active state.
- [ ] Replace handling of `SQLAlchemyError` with FastAPI's [Custom Exception 
handling](https://fastapi.tiangolo.com/tutorial/handling-errors/#install-custom-exception-handlers)
 across the Execution API endpoints. That way we don't need duplicate code 
across multiple endpoints.
- [ ] Replace `None` state on TaskInstance with a `Created` state. 
([link](https://github.com/orgs/apache/projects/405/views/1?pane=issue&itemId=85900878))
- [ ] Remove redundant code that also set's task type once we remove DB access 
from the worker. This is assuming that the Webserver or the new FastAPI 
endpoints don't use this endpoint.

Report URL: https://github.com/apache/airflow/actions/runs/11672243410

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to