aglinxinyuan opened a new issue, #4559:
URL: https://github.com/apache/texera/issues/4559

   ### What happened?
   
   `MainLoop.process_input_state` 
(`amber/src/main/python/core/runnables/main_loop.py:192-205`) reads 
`current_output_state` *between* its two `_switch_context()` calls:
   
   ```python
   def process_input_state(self) -> None:
       self._switch_context()                                                # 
(1)
       output_state = self.context.state_processing_manager.get_output_state()
       self._switch_context()                                                # 
(2)
       if output_state is not None:
           for to, batch in 
self.context.output_manager.emit_state(output_state):
               ...emit...
   ```
   
   Switch (1) returns the moment DataProcessor exits its previous task and 
parks at the run-loop's end-of-body switch (`data_processor.py:65`). At that 
point the executor for the *new* state has not run yet — the actual 
`executor.process_state(...)` call happens *during* switch (2): DataProcessor 
returns from line 65 wait, iterates the run loop, consumes 
`current_input_state`, runs the executor, writes `current_output_state`, then 
notifies MainLoop back via `process_state`'s `finally: self._switch_context()` 
(`data_processor.py:111-112`).
   
   Net effect: the local `output_state` captured at line 194 is the *previous* 
cycle's value (or `None` for the very first state). Each state's output is 
emitted one cycle late — state[i]'s processed output is emitted while MainLoop 
processes state[i+1].
   
   For state DataElements this is *mostly* invisible because the last state 
gets flushed when EndChannel arrives via `_process_end_channel -> 
process_input_state`. But `process_internal_marker` for `EndChannel` 
(`data_processor.py:75-80`) writes its own state into the same slot:
   
   ```python
   elif isinstance(internal_marker, EndChannel):
       self._set_output_state(executor.produce_state_on_finish(port_id))
       self._switch_context()                       # line 79
       self._set_output_tuple(executor.on_finish(port_id))
   ```
   
   That `_set_output_state(produce_state_on_finish(...))` runs *during* 
MainLoop's switch (2). MainLoop's local `output_state` was already captured 
before switch (2) started, so it holds state[N]'s processed output, which is 
correctly emitted. After `process_input_state` returns, `_process_end_channel` 
proceeds into `process_input_tuple` / `port_completed` / `complete()`, none of 
which read `current_output_state` again. **The finish-state is silently 
dropped.**
   
   This is invisible with the default `Operator.produce_state_on_finish` 
(returns `None`) and with `EchoOperator` for the same reason — which is why the 
existing `test_main_loop_thread_can_process_messages` does not catch it. It 
bites any operator that needs to emit a final state when its input port closes 
(state-aggregation, summary, terminal-reduce, anything that flushes accumulated 
state on completion).
   
   **Expected:** when an operator's `produce_state_on_finish(port)` returns a 
non-`None` `State`, that state is emitted as a `StateFrame` to downstream 
channels.
   **Actual:** the state is written to `current_output_state`, then overwritten 
/ never read on the next `process_input_state` call (or at all, if no further 
state cycle follows). No `StateFrame` carrying it appears on the output queue.
   
   ### How to reproduce?
   
   Wire a custom executor whose `produce_state_on_finish` returns a non-`None` 
state, send a state followed by `EndChannel`, and watch the output queue:
   
   ```python
   class FinishStateOperator:
       @staticmethod
       def process_tuple(tuple_, port): yield tuple_
       @staticmethod
       def process_state(state, port): return state
       @staticmethod
       def produce_state_on_finish(port):
           s = State()
           s.add("finish_marker", "ran")
           return s
       @staticmethod
       def on_finish(port): yield
       @staticmethod
       def close(): pass
   ```
   
   Drive it through MainLoop with state DataElements followed by an EndChannel 
ECM. Disable data on the output queue and drain the control-channel replies 
(`port_completed × 2` + `worker_execution_completed`); re-enable data and pull 
from the output queue. Expected: a `StateFrame` with `finish_marker == "ran"`. 
Actual: only the (lagged) state DataElements, then nothing — 
`output_queue.get()` blocks.
   
   A reproducing pytest case is `test_main_loop_thread_can_process_state` on 
[aglinxinyuan/texera#state-handshake-redesign](https://github.com/aglinxinyuan/texera/tree/state-handshake-redesign):
 on plain `main` it times out at the `output_queue.get()` for the 
post-EndChannel `StateFrame`; on the same branch with the handshake redesign 
applied it passes. The branch also has a candidate fix (collapsing the 
read-between-two-switches pattern to a single switch and reading after it).
   
   ### Version
   
   1.1.0-incubating (Pre-release/Master)
   
   ### Commit Hash (Optional)
   
   92affb56c686b5ed4fc6780bd23e798157865ed4


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to