Yicong-Huang commented on issue #4545:
URL: https://github.com/apache/texera/issues/4545#issuecomment-4333015875

   ### Root cause: #4424 (`ef66190f22` `fix: add missing context switches for 
repeated state processing`)
   
   CI history on `main` for the `scala (ubuntu-22.04, 11)` job:
   
   | commit | PR | result |
   | --- | --- | --- |
   | `e635dd02` | #4220 (test added) | ✅ |
   | `76480d98` … `23901be2` | (10 commits, incl. #4506, #4504, #4508, #4510, 
#4518, #4521, #4526, #4523, #4512, #4533, #4520, #4531) | ✅ all green |
   | **`ef66190f`** | **#4424** | ❌ first failure — `should propagate 
reconfiguration through a source operator in workflow` *** FAILED *** 
(1.minutes TimeoutException) |
   | `05b271c5` … | #4441 onward | flaky (mix of pass/fail) |
   
   #4424 changed `TupleProcessingManager.get_internal_marker()` from an atomic 
get-and-clear to a read-only get, and moved the clear to after the marker 
handler in `MainLoop._process_ecm`. It also added `_switch_context()` calls 
inside `MainLoop._process_state` and `DataProcessor.process_state`.
   
   Why it breaks this specific case:
   
   - The source-propagation test is the only ReconfigurationSpec case that runs 
**Python source → Python UDF** with the `UpdateExecutor` reconfigure ECM and 
the trailing `EndChannel` ECM both flowing through the UDF's data-channel ECM 
path. The other four cases either use a Scala/CSV source or take the single-op 
fries branch (direct control RPC).
   - When the UDF's `MainLoop` processes the `EndChannel` ECM, the handler sets 
`current_internal_marker = EndChannel()`. `_process_ecm` then dispatches to 
`_process_end_channel`, which calls `process_input_state` / 
`process_input_tuple`. Both `_switch_context()` to the `DataProcessor` thread.
   - Pre-#4424 `DataProcessor.run()` read the marker via the get-and-clear 
`get_internal_marker()`, processed it once, then on the next iteration saw 
`None` and proceeded normally. Post-#4424 the marker stays set until 
`_process_ecm` clears it after `_process_end_channel` returns — but 
`_process_end_channel` cannot return until `DataProcessor` yields back, and 
`DataProcessor` keeps re-reading the same `EndChannel` marker and re-invoking 
`process_internal_marker` (`produce_state_on_finish`, `on_finish`).
   - Net effect: the UDF never reaches `complete()`, no `port_completed` 
reaches the controller, region never terminates, `Await.result(completion, 
Duration.fromMinutes(1))` in `ReconfigurationSpec.shouldReconfigure` times out.
   
   Reverting #4424 outright is not a fix — it re-breaks the 
pause/state-processing path that #4424 was added to repair (locally, reverting 
it makes 3/5 cases time out at the 10s `pausedReached` await). The correct fix 
should keep #4424's context-switch additions but ensure the marker is consumed 
exactly once on the `DataProcessor` side, e.g. clear `current_internal_marker` 
immediately after the read in `DataProcessor.run()` (restoring atomic consume), 
or before the first `_switch_context` inside `_process_end_channel` / 
`_process_start_channel`.
   
   cc @aglinxinyuan


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to