The GitHub Actions job "Tests" on airflow.git/triggerer-comms-66412 has 
succeeded.
Run started by GitHub user jedcunningham (triggered by jedcunningham).

Head commit for run:
8e97f9a6640a9494328e2940b051bb1ee6ee7a19 / Ash Berlin-Taylor <[email protected]>
Fix triggerer crash when multiple triggers call sync SDK methods concurrently 
(#66412)

TriggerCommsDecoder used an asyncio.Lock to serialise concurrent requests,
then read the response while holding the lock.  When a trigger called a
sync SDK method wrapped with sync_to_async(), that method ran in a worker
thread and internally used async_to_sync() to make async calls, spinning
up a second event loop in that thread.  The two event loops then raced to
read from the same asyncio.StreamReader, producing "Response read out of
order!" and crashing the TriggerRunner subprocess.  PR #64882 attempted a
threading.Lock fix but introduced a new deadlock where the triggerer
heartbeat normally while processing zero triggers.

Replace the lock-based serial approach with response multiplexing:

* Each asend() registers an asyncio.Future keyed by frame.id in a
  _pending dict.  A single _reader_loop background task reads frames
  one at a time and dispatches each response to the right waiter.
  No lock, no ordering assumption.

* sync send() from worker threads schedules asend() on the main event
  loop via run_coroutine_threadsafe() and blocks the calling thread.
  This eliminates competing event loops entirely.

* sync send() from the event loop thread itself (e.g. a trigger calling
  a sync SDK method directly from async def run() via greenback) would
  deadlock with run_coroutine_threadsafe(...).result() because .result()
  blocks the thread the loop runs on.  Detected via thread-ID comparison;
  greenback.await_() is used instead to teleport the coroutine back into
  the running loop.

* asend() called from a foreign event loop (async_to_sync inside a
  sync_to_async-wrapped trigger) detects the loop mismatch and bridges
  back to the main loop via run_coroutine_threadsafe + wrap_future so
  the foreign loop can still await the result.  This is the root-cause
  code path, covered directly by test_foreign_loop_path.

* init_comms() reads the initial StartTriggerer frame before starting
  the reader loop so the loop never races with initialisation.

Add a subprocess event-loop watchdog: TriggerRunnerSupervisor stamps
_last_runner_comms on every message received from the subprocess.  If the
subprocess goes silent for longer than the new [triggerer]
runner_health_check_threshold (default 30 s), heartbeat() skips the DB
update so the scheduler sees the triggerer as unhealthy and reassigns its
triggers.  This detects a deadlocked or hung event loop that the
process-alive check alone cannot catch.

(cherry picked from commit 099fd2b03f3a4e642f75e748fbce2f3d99ffbba8)

Report URL: https://github.com/apache/airflow/actions/runs/25402301296

With regards,
GitHub Actions via GitBox


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to