I just took a look and it turns out that DebugExecutor works fine with
triggerer you just need to have one running.

You could run one in a subprocess.  I experimented with refactoring the
subprocess hook for this purpose (so you can start the subprocess
asynchronously) and then ran this dag with debug executor and it worked.

from __future__ import annotations

import os

os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"

from datetime import timedelta

import pendulum

from airflow.models import DAG
from airflow.sensors.time_delta import TimeDeltaSensorAsync

with DAG(
    dag_id="example_sensors",
    schedule="@once",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
) as dag:
    t0a = TimeDeltaSensorAsync(task_id="wait_some_seconds_async",
delta=timedelta(seconds=2))

from airflow.hooks.subprocess import SubprocessHook

# let's get triggerer running in a subprocess
hook = SubprocessHook()
triggerer_process = hook.start_process(command=["airflow",
"triggerer"], cwd="/tmp")

# now let's run the dag
dag.clear()
dag.run()

# now the dag has completed

# kill triggerer
hook.send_sigterm(triggerer_process)
result = hook.process_output(subprocess=triggerer_process)
assert result.exit_code == 0

# now triggerer has exited and our system test is done


So, something like this could be done.

Reply via email to