The GitHub Actions job "Tests" on airflow.git has succeeded.
Run started by GitHub user kaxil (triggered by kaxil).

Head commit for run:
016c22be060b31935676b8db63f79c0297aee1d3 / Amogh <[email protected]>
AIP-72: Inline DAG injection for task runner tests

closes: https://github.com/apache/airflow/issues/44805
Dependent on https://github.com/apache/airflow/pull/44786

Every time when we port the different TI state handling in the task runner, it 
is usually followed by an integration test of sorts to test the end to end flow 
of whether that state is testable or not. For example:
1. For skipped state, we use the DAG 
https://github.com/apache/airflow/pull/44786/files#diff-cabbddd33130ce1a769412f5fc55dd23e4af4d0fa75f8981689daae769e0680dR1
 and we test using the UT in task runner: 
https://github.com/apache/airflow/pull/44786/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R141-R161
2. For deferred state, we use the DAG: 
https://github.com/apache/airflow/pull/44241/files#diff-2152ed5392424771e27a69173b3c18caae717939719df8f5dbbbdfee5f9efd9bR1
 and test it using UT in task runner: 
https://github.com/apache/airflow/pull/44241/files#diff-413c3c59636a3c7b41b8bb822827d18a959778d0b6331532e0db175c829dbfd2R93-R127

Due to this, when new ti states are added or tests for that matter, it 
eventually leads to a huge folder with DAGs under `task_sdk/tests/dags` which 
could soon get ever growing and unmanageable.

The solution is in two parts:
1. The first part would be the ability to create dynamic or in line dags which 
has been implemented using a DAGFactory kind of function:
```
def get_inline_dag(dag_id: str, tasks: BaseOperator) -> DAG:
    dag = DAG(
        dag_id=dag_id,
        default_args={"start_date": timezone.datetime(2024, 12, 3)},
    )
    setattr(tasks, "dag", dag)

    return dag

```
This function is capable of accepting `one` task as of now and creating a DAG 
out of it and returning the DAG object which should suffice our current testing 
needs, if there is a need, we can extend this function to support more than one 
tasks and their relationships.
Usage:
```
    task = PythonOperator(
        task_id="skip",
        python_callable=lambda: (_ for _ in ()).throw(
            AirflowSkipException("This task is being skipped intentionally."),
        ),
    )

    dag = get_inline_dag("basic_skipped", task)
```
The usage is as simple as creating any task from any operator and passing it 
down to this function.

2. Mocking the parse function using KGB spy_agency: 
https://pypi.org/project/kgb/
The idea here is to use a spy agency to substitute out the `parse` function 
with a mock parser that does a bare minimum of the actual parser. We choose 
spy_agency over the mock library for two reasons primarily:
a) With `spy_agency`, you can mock specific methods or functions without 
affecting the entire class or module.
b) Minimal dispruption and ease of use.

1. Replaced usage of all "actual" dags with in line dags in task runner tests 
which either do parsing or run.
2. Deleted two DAGs
3. Cannot remove the other two DAGs as they are tied to test_supervisor.py 
tests which use the DAG path as of now. Can be taken in a follow up if needed. 
Example:
![image](https://github.com/user-attachments/assets/01baa82a-7b43-4ff1-bc7e-c2fc20cef50d)

1. No need to create any more DAG files for integration tests for task runner, 
which could be frequent with current development rate for AIP 72.
2. Ability to easily create in line DAGs.

Basic DAG
![image](https://github.com/user-attachments/assets/cf7a94b5-6c4c-4103-99a0-32047207a9b2)

deferred DAG
![image](https://github.com/user-attachments/assets/328f99d0-4483-48c5-9127-dd7812f47ae0)

Co-Authored-By: Kaxil Naik <[email protected]>

Report URL: https://github.com/apache/airflow/actions/runs/12263378845

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to