aglinxinyuan opened a new issue, #4559:
URL: https://github.com/apache/texera/issues/4559
### What happened?
`MainLoop.process_input_state`
(`amber/src/main/python/core/runnables/main_loop.py:192-205`) reads
`current_output_state` *between* its two `_switch_context()` calls:
```python
def process_input_state(self) -> None:
self._switch_context() #
(1)
output_state = self.context.state_processing_manager.get_output_state()
self._switch_context() #
(2)
if output_state is not None:
for to, batch in
self.context.output_manager.emit_state(output_state):
...emit...
```
Switch (1) returns the moment DataProcessor exits its previous task and
parks at the run-loop's end-of-body switch (`data_processor.py:65`). At that
point the executor for the *new* state has not run yet — the actual
`executor.process_state(...)` call happens *during* switch (2): DataProcessor
returns from line 65 wait, iterates the run loop, consumes
`current_input_state`, runs the executor, writes `current_output_state`, then
notifies MainLoop back via `process_state`'s `finally: self._switch_context()`
(`data_processor.py:111-112`).
Net effect: the local `output_state` captured at line 194 is the *previous*
cycle's value (or `None` for the very first state). Each state's output is
emitted one cycle late — state[i]'s processed output is emitted while MainLoop
processes state[i+1].
For state DataElements this is *mostly* invisible because the last state
gets flushed when EndChannel arrives via `_process_end_channel ->
process_input_state`. But `process_internal_marker` for `EndChannel`
(`data_processor.py:75-80`) writes its own state into the same slot:
```python
elif isinstance(internal_marker, EndChannel):
self._set_output_state(executor.produce_state_on_finish(port_id))
self._switch_context() # line 79
self._set_output_tuple(executor.on_finish(port_id))
```
That `_set_output_state(produce_state_on_finish(...))` runs *during*
MainLoop's switch (2). MainLoop's local `output_state` was already captured
before switch (2) started, so it holds state[N]'s processed output, which is
correctly emitted. After `process_input_state` returns, `_process_end_channel`
proceeds into `process_input_tuple` / `port_completed` / `complete()`, none of
which read `current_output_state` again. **The finish-state is silently
dropped.**
This is invisible with the default `Operator.produce_state_on_finish`
(returns `None`) and with `EchoOperator` for the same reason — which is why the
existing `test_main_loop_thread_can_process_messages` does not catch it. It
bites any operator that needs to emit a final state when its input port closes
(state-aggregation, summary, terminal-reduce, anything that flushes accumulated
state on completion).
**Expected:** when an operator's `produce_state_on_finish(port)` returns a
non-`None` `State`, that state is emitted as a `StateFrame` to downstream
channels.
**Actual:** the state is written to `current_output_state`, then overwritten
/ never read on the next `process_input_state` call (or at all, if no further
state cycle follows). No `StateFrame` carrying it appears on the output queue.
### How to reproduce?
Wire a custom executor whose `produce_state_on_finish` returns a non-`None`
state, send a state followed by `EndChannel`, and watch the output queue:
```python
class FinishStateOperator:
@staticmethod
def process_tuple(tuple_, port): yield tuple_
@staticmethod
def process_state(state, port): return state
@staticmethod
def produce_state_on_finish(port):
s = State()
s.add("finish_marker", "ran")
return s
@staticmethod
def on_finish(port): yield
@staticmethod
def close(): pass
```
Drive it through MainLoop with state DataElements followed by an EndChannel
ECM. Disable data on the output queue and drain the control-channel replies
(`port_completed × 2` + `worker_execution_completed`); re-enable data and pull
from the output queue. Expected: a `StateFrame` with `finish_marker == "ran"`.
Actual: only the (lagged) state DataElements, then nothing —
`output_queue.get()` blocks.
A reproducing pytest case is `test_main_loop_thread_can_process_state` on
[aglinxinyuan/texera#state-handshake-redesign](https://github.com/aglinxinyuan/texera/tree/state-handshake-redesign):
on plain `main` it times out at the `output_queue.get()` for the
post-EndChannel `StateFrame`; on the same branch with the handshake redesign
applied it passes. The branch also has a candidate fix (collapsing the
read-between-two-switches pattern to a single switch and reading after it).
### Version
1.1.0-incubating (Pre-release/Master)
### Commit Hash (Optional)
92affb56c686b5ed4fc6780bd23e798157865ed4
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]