aglinxinyuan commented on code in PR #4560:
URL: https://github.com/apache/texera/pull/4560#discussion_r3165761207
##########
amber/src/main/python/core/runnables/data_processor.py:
##########
@@ -49,20 +49,17 @@ def run(self) -> None:
with self._context.tuple_processing_manager.context_switch_condition:
self._context.tuple_processing_manager.context_switch_condition.wait()
self._running.set()
- self._switch_context()
while self._running.is_set():
- marker =
self._context.tuple_processing_manager.get_internal_marker()
- state = self._context.state_processing_manager.get_input_state()
- tuple_ = self._context.tuple_processing_manager.current_input_tuple
- if marker is not None:
- self.process_internal_marker(marker)
- elif state is not None:
- self.process_state(state)
- elif tuple_ is not None:
+ tpm = self._context.tuple_processing_manager
+ spm = self._context.state_processing_manager
+ if tpm.current_internal_marker is not None:
+ self.process_internal_marker(tpm.get_internal_marker())
+ elif spm.current_input_state is not None:
+ self.process_state(spm.get_input_state())
Review Comment:
Done as a runtime check at the top of `DataProcessor.run`'s loop body in
`d890004c11`:
```python
queued = has_marker + has_state + has_tuple
if queued != 1:
raise RuntimeError(
"DataProcessor expected exactly one queued input per "
f"iteration, got marker={has_marker}, state={has_state}, "
f"tuple={has_tuple}"
)
```
You're right that no two of `current_internal_marker` /
`current_input_state` / `current_input_tuple` should be set at the same tick —
`MainLoop` is single-threaded and the three are populated on disjoint code
paths (`_process_state`, `_process_tuple`, ECM-handler-via-`_process_ecm`) that
each switch to `DataProcessor` immediately after writing their slot. The
runtime check makes that contract explicit and fails loud if it's ever violated.
So the if/elif order in the loop is just dispatch on which slot is set, not
a real priority — at most (and always exactly) one branch is reachable per
iteration.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]