aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3407078310
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -189,20 +253,52 @@ def process_input_tuple(self) -> None:
output_tuple
)
- def process_input_state(self) -> None:
+ def process_input_state(
+ self,
+ output_loop_counter: int = 0,
+ output_loop_start_id: str = "",
+ output_loop_start_state_uri: str = "",
+ ) -> None:
self._switch_context()
output_state = self.context.state_processing_manager.get_output_state()
if output_state is not None:
- for to, batch in
self.context.output_manager.emit_state(output_state):
- self._output_queue.put(
- DataElement(
- tag=ChannelIdentity(
- ActorVirtualIdentity(self.context.worker_id), to,
False
- ),
- payload=batch,
- )
+ executor = self.context.executor_manager.executor
+ if isinstance(executor, LoopEndOperator):
+ self.context.output_manager.reset_storage()
Review Comment:
You're right — the code is correct and the description was stale (and,
worse, named the wrong branch). Fixed the description, and added a call-site
comment in 3f11520450.
`reset_output_storage()` fires **once per iteration for every Loop End**, on
the matching-loop consume (`loop_counter == 0`). The nested pass-through
(`loop_counter > 0`) is forwarded and returned in `_process_state_frame`
*before* `process_input_state` runs, so it never resets. The old description
had this backwards on three counts: it said inner-LoopEnd-only (it's every
LoopEnd), it named the `loop_counter > 0` pass-through branch (reset is
actually the `== 0` consume branch), and it claimed single loops never reset
(they reset every iteration). The method's own docstring already states the
correct behavior — *"once per loop iteration … each iteration must start from
empty tables so the materialization holds only the final iteration's rows"* —
so the description was the only wrong artifact.
Why reset-every-iteration is correct, not a bug: dropping+recreating the
Loop End's output each iteration is what keeps the materialization at the
**final** iteration's rows instead of all iterations concatenated. Gating it to
nested-inner-only would break the single-loop case, so no code change.
Done:
* PR description "Worker output" row rewritten to match the code + docstring.
* Added a comment at the `reset_output_storage()` call site in
`process_input_state` documenting the firing condition (where you were
reading), so the next reader doesn't hit the same ambiguity.
No behavior change. 33 main_loop + output_manager tests still pass.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]