The GitHub Actions job "Tests" on airflow.git has failed.
Run started by GitHub user ashb (triggered by ashb).

Head commit for run:
c4ffd3294c131e53e4a3becab96aa0a89aa89ad0 / Ash Berlin-Taylor <[email protected]>
Start building the replacement task runner for Task Execution SDK

The eventual goal if this "airflow.sdk.exeuction_time" package is to replace
LocalTaskJob and StandardTaskRunner, but at this stage it co-exists with it's
replacement.

As this PR is not a complete re-implementation of all the features that exist
currently (no handling of task level callbacks yet, no AirflowSkipException
etc.) the current tests are skeleton at best. Once we get closer to feature
parity (in future PRs) the tests will grow to match.

This supervisor and task runner operates slightly differently to the current
classes in these ways

**Logs from the subprocess are send over a different channel to stdout/stderr**

This makes the task supervisor a little bit more complex as it now has to
read stdout, stderr and a logs channel. The advantage of this approach is
that it makes the logs setup in the task process itself markedly simpler --
all it has to do is write logs output to the custom file handle as JSON and
it will show up "natively" as logs.

structlog has been chosen as the logging engine over stdlib's own logging as
the ability to have structured fields in the logs is nice, and stdlib is
configured to send logs to a stuctlog processor.

**Direct database access is replaced with an HTTP API client**

This is the crux of this feature and of AIP-72 in general -- tasks run via
this runner can no longer access DB models or DB session directly. This PR
doesn't yet implement the code/shims to make 
`Connection.get_connection_from_secrets`
use this client yet - that will be future work.

The reason tasks don't speak directly to the API server is primarily for two
reasons:

1. The supervisor process already needs to maintain an http session in order
   to report the task as started, to heart beat it, and to mark it as
   finished; and so because of that
2. Reduce the number of active HTTP connections for tasks to 1 per task
   (instead of 2 per task).

THe other reason we have this interface is that DAG parsing code will very
soon need to be updated to not have direct DB access either, and having this
"in process" interface ability already means that we can support commands like
`airflow dags reserialize` without having a running API server.

The API client itself is not auto-generated: I tried a number of different
client generates based on the OpenAPI spec and found them all lacking or buggy
in different ways, and the http client side itself is very simple, the only
interesting/difficult bit is the generation of the datamodels from the OpenAPI
spec which I found one that

msgspec was chosen over Pydantic as it is much lighter weight (and thus
quicker), especially on a client side when we have next to no validation
requirements of response data. I admit that I have not benchmarked it
specifically though.

Report URL: https://github.com/apache/airflow/actions/runs/11800248413

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to