aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3407425925


##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,20 +211,42 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:
+        port_id = self.get_port_ids()[0]
+        storage_uri_base = self._storage_uri_base
+        self.close_port_storage_writers()
+        DocumentFactory.create_document(

Review Comment:
   Good catch — this is a real latent bug, deeper than the apparent 
contradiction. Fixed in e61681d695.
   
   The two paths don't actually conflict at runtime, because **the Python reset 
never fires**: a Loop End's generated `process_state` returns `None` (and 
`produce_state_on_finish` isn't overridden, so it's `None` too), so 
`output_state` is always `None` for a Loop End — and `reset_output_storage()` 
sat under `if output_state is not None:`. On top of that it was hooked in 
`process_input_state` (the consume path, `loop_counter == 0`), not the outer 
pass-through (`loop_counter > 0`) where it belongs. So your parenthetical was 
sharper than it looked: not only is the description's "inner Loop End only" 
claim not matched by the code — the call was effectively dead for *every* Loop 
End.
   
   So today every Loop End just **accumulates**, which is actually correct for 
a single / outermost loop (the scheduler's `reusesOutputStorageOnReExecution` 
keeps the doc across re-runs and the writer appends — that's the 
`RegionExecutionCoordinator` "would erase what we just wrote" path, and it's 
right). The gap is the **nested** case: an inner Loop End should accumulate 
only within the current outer iteration and reset when the outer loop advances. 
With the reset dead, the inner Loop End accumulated across all outer iterations 
(9 rows in the 3×3 case instead of 3).
   
   Also — my earlier description edit (the one that prompted this) was wrong: 
I'd "corrected" the description toward the buggy code. The *original* wording 
(inner Loop End of a nested loop, `loop_counter > 0` pass-through) described 
the intent correctly, and I've restored it.
   
   **Fix:**
   - Move `reset_output_storage()` to the inner-Loop-End pass-through branch in 
`_process_state_frame` (`loop_counter > 0`). The input reader replays all 
states before any data each region execution, so the tables still hold the 
*previous* outer iteration's rows when the outer boundary state passes through 
— clearing there makes each outer iteration accumulate from empty.
   - It fires exactly **once per outer iteration**: each loop operator is its 
own region, so the inner Loop Start's region doesn't carry 
`reusesOutputStorageOnReExecution` and its output is recreated on every inner 
back-edge — the outer pass-through therefore only reaches the inner Loop End on 
the first inner iteration of each outer iteration. A single / outermost Loop 
End never sees `loop_counter > 0`, so it never resets.
   - Removed the dead consume-path call; corrected the `reset_output_storage` 
docstring, the call-site comment, and the `RegionExecutionCoordinator` comment. 
Scala side unchanged — it provides the base per-loop accumulation this reset 
carves the nested exception out of.
   
   **Tests:**
   - Unit (`test_main_loop`): the inner pass-through triggers 
`reset_output_storage` once and doesn't invoke the operator; the consume path 
(single loop) and a Loop Start pass-through never reset.
   - Integration (`LoopIntegrationSpec`): assert the **materialized** result 
row counts — single loop = 3 (accumulate), nested inner Loop End = 3 (not 9), 
nested outer Loop End = 9. The pre-existing cumulative output-tuple counts 
can't distinguish accumulate from reset, which is why this slipped through. 
Verified locally via the unit tests + ruff; the materialized integration 
assertions run in the `amber-integration` CI job.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to