aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3354292866
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
+ # The URI lives on the upstream operator's output port (which
+ # LoopStart's first materialization reader is reading from).
+ reader_runnables = (
+ self.context.input_manager.get_input_port_mat_reader_threads()
+ )
+ output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ next(iter(reader_runnables.values()))[0].uri
+ )
Review Comment:
The substantive concern is closed by prior work; this commit (a374596844)
adds the defensive test.
**1. URI is no longer in user state.** Commit `30ba48c39f` moved
`loop_counter` / `LoopStartId` / `LoopStartStateURI` onto the typed
`StateFrame` envelope (`core/models/payload.py`). The user-facing operator's
`process_state(state, port)` callback only ever receives `frame.frame` (the
inner `State` dict) — the envelope's scalar fields are siblings, not keys
inside it. Grep returns zero `state["LoopStartStateURI"]` /
`output_state["LoopStartStateURI"]` accesses anywhere; the runtime captures the
URI into `MainLoop._loop_start_state_uri` (`main_loop.py:374-375`) and reads it
from there in `_jump_to_loop_start` (`main_loop.py:138`).
**2. New test pins the property end to end.** Added
`test_user_state_excludes_envelope_metadata_on_consume_branch` in
`test_main_loop.py`: builds a `StateFrame` with envelope metadata, drives it
through `_process_state_frame` on the consume branch, and asserts (a) the
runtime captured the envelope onto its instance fields, but (b) the
operator-facing `state_processing_manager.current_input_state` carries only the
inner State's keys — no `LoopStartId` / `LoopStartStateURI` / `loop_counter` as
string keys. If a future refactor accidentally merges envelope fields into the
inner state, the test breaks.
**3. On the secondary hint** — *"LoopEnd already has the operator ID, so the
URI doesn't need to travel through state"*: technically true but requires
either a new controller RPC (LoopEnd asks the controller for LoopStart's input
URI given the op id) or a runtime URI registry / canonical naming scheme. The
envelope already separates the URI from user-visible state, so leaving the
envelope field as-is keeps the back-edge write path simple. Happy to pursue
removing the envelope field too if you'd prefer that direction — let me know.
24/24 `test_main_loop.py` tests green; diff scoped to the one test file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]