aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3406801998


##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,20 +211,42 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:

Review Comment:
   You're right — that earlier work was lost when the branch was 
rebased/squashed. Re-applied on the current branch in 2095b592ee.
   
   * **Renamed** `reset_storage` → `reset_output_storage` (and updated the 
caller in `main_loop.py` plus the two `__init__` / `set_up_port_storage_writer` 
comments that already referenced the intended name).
   * **Docstring** now states what it does (drop + recreate the result AND 
state tables, then reopen the writers), that it's called only by a Loop End 
worker once per iteration, and — the part that previously lived only in the PR 
description — **why truncating live storage is safe**: a loop forces 
MATERIALIZED execution mode, so downstream operators don't begin reading this 
output until the loop region has fully completed; no reader can observe an 
intermediate truncation.
   * **Guards**: the two previously-implicit preconditions now raise a clear 
`RuntimeError` instead of silently resetting the wrong port or dereferencing 
`None` — (1) exactly one output port, (2) `set_up_port_storage_writer` already 
ran (`_storage_uri_base` populated).
   * **Tests**: new `TestResetOutputStorage` in `test_output_manager.py` covers 
the happy path (close → recreate result+state docs → reopen writer) and both 
guard failures, with the iceberg/thread collaborators mocked.
   
   9/9 `test_output_manager.py` and 24/24 `test_main_loop.py` tests green.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to