aglinxinyuan commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3337639407
##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]
time, or None.
"""
yield
+
+
+class LoopStartOperator(TableOperator):
+ @overrides.final
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ if "LoopStartStateURI" in state:
+ state["loop_counter"] += 1
Review Comment:
Addressed by moving `loop_counter` off `State` entirely (007a264b5). It is
no longer a key in the `State` dict the operator is handed — the loop operators
never read or mutate it. It now rides on the `StateFrame` transport envelope
and the worker runtime owns it: `main_loop._process_state_frame` applies the
`+1`/`-1` and handles the LoopStart/LoopEnd nested pass-through before the
operator runs (so the generated LoopEnd is now consume-only). It is
materialized/serialized as its own column via a new bilingual `StateStorage`
(`content` STRING, `loop_counter` LONG) format, so `State` and its
single-column schema stay pure user content. Operator-level counter coverage
was relocated to `main_loop` runtime tests, with a new `StateStorageSpec` for
the column round-trip.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]