aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3407078310


##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -189,20 +253,52 @@ def process_input_tuple(self) -> None:
                     output_tuple
                 )
 
-    def process_input_state(self) -> None:
+    def process_input_state(
+        self,
+        output_loop_counter: int = 0,
+        output_loop_start_id: str = "",
+        output_loop_start_state_uri: str = "",
+    ) -> None:
         self._switch_context()
         output_state = self.context.state_processing_manager.get_output_state()
         if output_state is not None:
-            for to, batch in 
self.context.output_manager.emit_state(output_state):
-                self._output_queue.put(
-                    DataElement(
-                        tag=ChannelIdentity(
-                            ActorVirtualIdentity(self.context.worker_id), to, 
False
-                        ),
-                        payload=batch,
-                    )
+            executor = self.context.executor_manager.executor
+            if isinstance(executor, LoopEndOperator):
+                self.context.output_manager.reset_storage()

Review Comment:
   You're right — the code is correct and the description was stale (and, 
worse, named the wrong branch). Fixed the description, and added a call-site 
comment in 3f11520450.
   
   `reset_output_storage()` fires **once per iteration for every Loop End**, on 
the matching-loop consume (`loop_counter == 0`). The nested pass-through 
(`loop_counter > 0`) is forwarded and returned in `_process_state_frame` 
*before* `process_input_state` runs, so it never resets. The old description 
had this backwards on three counts: it said inner-LoopEnd-only (it's every 
LoopEnd), it named the `loop_counter > 0` pass-through branch (reset is 
actually the `== 0` consume branch), and it claimed single loops never reset 
(they reset every iteration). The method's own docstring already states the 
correct behavior — *"once per loop iteration … each iteration must start from 
empty tables so the materialization holds only the final iteration's rows"* — 
so the description was the only wrong artifact.
   
   Why reset-every-iteration is correct, not a bug: dropping+recreating the 
Loop End's output each iteration is what keeps the materialization at the 
**final** iteration's rows instead of all iterations concatenated. Gating it to 
nested-inner-only would break the single-loop case, so no code change.
   
   Done:
   * PR description "Worker output" row rewritten to match the code + docstring.
   * Added a comment at the `reset_output_storage()` call site in 
`process_input_state` documenting the firing condition (where you were 
reading), so the next reader doesn't hit the same ambiguity.
   
   No behavior change. 33 main_loop + output_manager tests still pass.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to