aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3354292866


##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
+    def _attach_loop_start_id(self, output_state: State) -> None:
+        if "LoopStartId" in output_state:
+            return
+        output_state["LoopStartId"] = self.context.worker_id.split("-", 
1)[1].rsplit(
+            "-main-0", 1
+        )[0]
+        # The URI lives on the upstream operator's output port (which
+        # LoopStart's first materialization reader is reading from).
+        reader_runnables = (
+            self.context.input_manager.get_input_port_mat_reader_threads()
+        )
+        output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+            next(iter(reader_runnables.values()))[0].uri
+        )

Review Comment:
   The substantive concern is closed by prior work; this commit (a374596844) 
adds the defensive test.
   
   **1. URI is no longer in user state.** Commit `30ba48c39f` moved 
`loop_counter` / `LoopStartId` / `LoopStartStateURI` onto the typed 
`StateFrame` envelope (`core/models/payload.py`). The user-facing operator's 
`process_state(state, port)` callback only ever receives `frame.frame` (the 
inner `State` dict) — the envelope's scalar fields are siblings, not keys 
inside it. Grep returns zero `state["LoopStartStateURI"]` / 
`output_state["LoopStartStateURI"]` accesses anywhere; the runtime captures the 
URI into `MainLoop._loop_start_state_uri` (`main_loop.py:374-375`) and reads it 
from there in `_jump_to_loop_start` (`main_loop.py:138`).
   
   **2. New test pins the property end to end.** Added 
`test_user_state_excludes_envelope_metadata_on_consume_branch` in 
`test_main_loop.py`: builds a `StateFrame` with envelope metadata, drives it 
through `_process_state_frame` on the consume branch, and asserts (a) the 
runtime captured the envelope onto its instance fields, but (b) the 
operator-facing `state_processing_manager.current_input_state` carries only the 
inner State's keys — no `LoopStartId` / `LoopStartStateURI` / `loop_counter` as 
string keys. If a future refactor accidentally merges envelope fields into the 
inner state, the test breaks.
   
   **3. On the secondary hint** — *"LoopEnd already has the operator ID, so the 
URI doesn't need to travel through state"*: technically true but requires 
either a new controller RPC (LoopEnd asks the controller for LoopStart's input 
URI given the op id) or a runtime URI registry / canonical naming scheme. The 
envelope already separates the URI from user-visible state, so leaving the 
envelope field as-is keeps the back-edge write path simple. Happy to pursue 
removing the envelope field too if you'd prefer that direction — let me know.
   
   24/24 `test_main_loop.py` tests green; diff scoped to the one test file.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to