aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3337639407


##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+class LoopStartOperator(TableOperator):
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        if "LoopStartStateURI" in state:
+            state["loop_counter"] += 1

Review Comment:
   Addressed by moving `loop_counter` out of the `State` content dict entirely 
(latest: 63d243353). The loop operators never read or mutate it. It rides on 
the `StateFrame` transport envelope and the worker runtime owns it: 
`main_loop._process_state_frame` applies the `+1`/`-1` and handles the 
LoopStart/LoopEnd nested pass-through before the operator runs (so the 
generated LoopEnd is now consume-only). It is materialized/serialized as its 
own `loop_counter` column parallel to `content`: `State.SCHEMA` is the 
two-column schema and `State.to_tuple(loop_counter)` writes both columns, while 
`from_tuple` returns the bare State (the readers that need the counter read the 
column directly). The user state JSON stays clean. Operator-level counter 
coverage was relocated to `main_loop` runtime tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to