Yicong-Huang commented on issue #4545: URL: https://github.com/apache/texera/issues/4545#issuecomment-4333015875
### Root cause: #4424 (`ef66190f22` `fix: add missing context switches for repeated state processing`) CI history on `main` for the `scala (ubuntu-22.04, 11)` job: | commit | PR | result | | --- | --- | --- | | `e635dd02` | #4220 (test added) | ✅ | | `76480d98` … `23901be2` | (10 commits, incl. #4506, #4504, #4508, #4510, #4518, #4521, #4526, #4523, #4512, #4533, #4520, #4531) | ✅ all green | | **`ef66190f`** | **#4424** | ❌ first failure — `should propagate reconfiguration through a source operator in workflow` *** FAILED *** (1.minutes TimeoutException) | | `05b271c5` … | #4441 onward | flaky (mix of pass/fail) | #4424 changed `TupleProcessingManager.get_internal_marker()` from an atomic get-and-clear to a read-only get, and moved the clear to after the marker handler in `MainLoop._process_ecm`. It also added `_switch_context()` calls inside `MainLoop._process_state` and `DataProcessor.process_state`. Why it breaks this specific case: - The source-propagation test is the only ReconfigurationSpec case that runs **Python source → Python UDF** with the `UpdateExecutor` reconfigure ECM and the trailing `EndChannel` ECM both flowing through the UDF's data-channel ECM path. The other four cases either use a Scala/CSV source or take the single-op fries branch (direct control RPC). - When the UDF's `MainLoop` processes the `EndChannel` ECM, the handler sets `current_internal_marker = EndChannel()`. `_process_ecm` then dispatches to `_process_end_channel`, which calls `process_input_state` / `process_input_tuple`. Both `_switch_context()` to the `DataProcessor` thread. - Pre-#4424 `DataProcessor.run()` read the marker via the get-and-clear `get_internal_marker()`, processed it once, then on the next iteration saw `None` and proceeded normally. Post-#4424 the marker stays set until `_process_ecm` clears it after `_process_end_channel` returns — but `_process_end_channel` cannot return until `DataProcessor` yields back, and `DataProcessor` keeps re-reading the same `EndChannel` marker and re-invoking `process_internal_marker` (`produce_state_on_finish`, `on_finish`). - Net effect: the UDF never reaches `complete()`, no `port_completed` reaches the controller, region never terminates, `Await.result(completion, Duration.fromMinutes(1))` in `ReconfigurationSpec.shouldReconfigure` times out. Reverting #4424 outright is not a fix — it re-breaks the pause/state-processing path that #4424 was added to repair (locally, reverting it makes 3/5 cases time out at the 10s `pausedReached` await). The correct fix should keep #4424's context-switch additions but ensure the marker is consumed exactly once on the `DataProcessor` side, e.g. clear `current_internal_marker` immediately after the read in `DataProcessor.run()` (restoring atomic consume), or before the first `_switch_context` inside `_process_end_channel` / `_process_start_channel`. cc @aglinxinyuan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
