Xiao-zhen-Liu commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3400851469


##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,20 +211,42 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:

Review Comment:
   **[resolved thread 17 — but not in the branch] → PR-D**
   
   Thread 17 was closed with a reply describing a rename to 
`reset_output_storage`, a docstring, and `RuntimeError` guards (cited 
`e6bea518f2`). None of that is here: the method is still `reset_storage`, 
undocumented, with no precondition checks. The "why truncation is safe under 
MATERIALIZED" rationale still lives only in the PR description. Please re-open 
— add the docstring + the one-output-port / writer-initialized guards, or point 
me at the commit if it was lost in a rebase.



##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,20 +211,42 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:
+        port_id = self.get_port_ids()[0]
+        storage_uri_base = self._storage_uri_base
+        self.close_port_storage_writers()
+        DocumentFactory.create_document(

Review Comment:
   **[new — highest severity] → PR-D**
   
   `reset_storage` recreates **both** the result table (241-244) and the state 
table (245-247) with `override_if_exists=True`, on every consumed iteration. 
This is in direct tension with `RegionExecutionCoordinator.scala:579-589`, 
which deliberately *skips* recreating LoopEnd's documents on a region re-run, 
with the comment *"recreating it would erase what we just wrote."* One layer 
preserves the output across re-execution; the other truncates it each 
iteration. Please explain why the accumulated output isn't erased — and if 
truncation is intended, the scheduler comment and the PR description are then 
wrong.
   
   Related: the PR description says `reset_storage` "only fires on the inner 
Loop End of a nested loop … single loops never hit this path", but it's called 
for *any* `LoopEndOperator` that produced output state (`main_loop.py:267`), 
independent of `loop_counter`. Description and gating disagree.



##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +101,69 @@ def __init__(
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
+    def _compute_loop_start_id(self) -> typing.Tuple[str, str]:
+        # A LoopStart stamps its own operator id and the iceberg URI its input
+        # is read from onto the state it emits; the matching LoopEnd reads them
+        # back to jump. These ride the StateFrame envelope, not user state.
+        loop_start_id = get_operator_id(self.context.worker_id)
+        # The URI lives on the upstream operator's output port (which
+        # LoopStart's first materialization reader is reading from). LoopStart
+        # is constrained to a single input port + single reader, so fail loud
+        # rather than silently picking whichever dict iterator yields first --
+        # that would mask future graph mistakes by choosing an arbitrary URI.
+        reader_runnables = (
+            self.context.input_manager.get_input_port_mat_reader_threads()
+        )
+        if len(reader_runnables) != 1:
+            raise RuntimeError(
+                f"LoopStart expected exactly one input port, "
+                f"got {len(reader_runnables)}"
+            )
+        [(_, readers)] = reader_runnables.items()
+        if len(readers) != 1:
+            raise RuntimeError(
+                f"LoopStart expected exactly one input reader on its port, "
+                f"got {len(readers)}"
+            )
+        loop_start_state_uri = VFSURIFactory.state_uri(readers[0].uri)
+        return loop_start_id, loop_start_state_uri
+
+    def _jump_to_loop_start(
+        self, executor: LoopEndOperator, controller_interface
+    ) -> None:
+        state = executor.state
+        controller_interface.jump_to_operator_region(
+            JumpToOperatorRegionRequest(OperatorIdentity(self._loop_start_id))
+        )
+        uri = self._loop_start_state_uri
+        # Strip the per-iteration scratch (`table`, `output`) so only the
+        # user-visible loop state is written back to LoopStart's input. The
+        # loop metadata (counter, LoopStartId, LoopStartStateURI) is owned by
+        # the runtime and never lived in this dict.
+        for key in ("table", "output"):
+            state.pop(key, None)
+        writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
+        # The back-edge fires only after the matching LoopEnd consumed at
+        # loop_counter == 0, so the next iteration's input starts at depth 0.
+        writer.put_one(State(state).to_tuple(0))
+        writer.close()
+
     def complete(self) -> None:
         """
         Complete the DataProcessor, marking state to COMPLETED, and notify the
         controller.
         """
         # flush the buffered console prints
         self._check_and_report_console_messages(force_flush=True)
-        self.context.executor_manager.executor.close()
+        controller_interface = self._async_rpc_client.controller_stub()
+        executor = self.context.executor_manager.executor
+        if isinstance(executor, LoopEndOperator) and executor.condition():

Review Comment:
   **[thread 16 pushback — partially incorrect] → PR-D**
   
   The pushback ("no error handling, same design as Python UDF") is fair for 
`update` / `initialization` / `output` — those run on the guarded DataProcessor 
path and surface as a reported error/pause. But it's factually wrong for 
`condition()`: it's evaluated here on the MainLoop thread inside `complete()`, 
**outside** that path, and ahead of `executor.close()` / the COMPLETED 
transition. A user typo in `condition` won't surface like a UDF exception — 
it'll fail the worker thread mid-completion. Worth guarding this one call or 
documenting the asymmetry.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -576,8 +576,17 @@ class RegionExecutionCoordinator(
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(resultURI, schema)
-        DocumentFactory.createDocument(stateURI, State.schema)
+        // LoopEnd operators may re-execute the region multiple times; on
+        // subsequent iterations the result/state documents already exist,
+        // and `createDocument` (overrideIfExists=true) would clobber them.
+        // Skip the create call when the document is already there.
+        val isLoopEndRegion = region.getOperators.exists(_.isLoopEnd)

Review Comment:
   **[resolved threads 21 & 22 — but not in the branch] → PR-C**
   
   Both were closed with replies citing commits (`ca9e5ce8cc`, `540b7ba274`, 
`bbec98282e`) that aren't in the branch. The flag is still the type-named 
`isLoopEnd` (not a behavior name like `reusesOutputStorageOnReExecution`, 
thread 22), and this skip-create branch is still inline and untested (thread 
21). In the split these belong in PR-C — please add the behavior-named flag and 
a test that pre-creates the documents and asserts they're reused, not recreated.



##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -189,20 +253,52 @@ def process_input_tuple(self) -> None:
                     output_tuple
                 )
 
-    def process_input_state(self) -> None:
+    def process_input_state(
+        self,
+        output_loop_counter: int = 0,
+        output_loop_start_id: str = "",
+        output_loop_start_state_uri: str = "",
+    ) -> None:
         self._switch_context()
         output_state = self.context.state_processing_manager.get_output_state()
         if output_state is not None:
-            for to, batch in 
self.context.output_manager.emit_state(output_state):
-                self._output_queue.put(
-                    DataElement(
-                        tag=ChannelIdentity(
-                            ActorVirtualIdentity(self.context.worker_id), to, 
False
-                        ),
-                        payload=batch,
-                    )
+            executor = self.context.executor_manager.executor
+            if isinstance(executor, LoopEndOperator):
+                self.context.output_manager.reset_storage()

Review Comment:
   **[new] → PR-D**
   
   `reset_storage()` is invoked for any `LoopEndOperator` with output state 
here, regardless of `loop_counter`. The PR description claims it only fires on 
the inner LoopEnd of a nested loop. See the related note on 
`output_manager.py:241`.



##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,177 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+# Names the loop runtime owns inside the eval namespaces and across the loop
+# boundary; explicitly stripped from every state dict that crosses Loop
+# Start / Loop End so user code can neither read nor persist them.
+# Other reserved names that used to live in user state -- ``loop_counter``,
+# ``LoopStartId``, ``LoopStartStateURI`` -- are no longer in ``self.state``
+# at all; they ride the StateFrame envelope (see ``core.models.payload``)
+# and are stamped/captured by ``MainLoop._process_state_frame``.
+_RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
+
+
+class LoopStartOperator(TableOperator):
+    """Base class for the runtime side of a Loop Start operator.
+
+    The generator in ``LoopStartOpDesc.scala`` emits a thin
+    ``ProcessLoopStartOperator(LoopStartOperator)`` subclass that does
+    nothing more than wire the user-supplied ``initialization`` and
+    ``output`` expressions into ``open()`` and ``process_table()``; all
+    substantive logic lives here.
+
+    Lifecycle
+    ---------
+    * ``open()`` runs once when the worker starts. The generated subclass
+      executes the user's ``initialization`` against a fresh ``self.state``
+      dict; after it returns ``self.state`` holds *only* the user's loop
+      variables.
+    * ``process_state(state, port)`` (final) runs once when upstream sends
+      this LoopStart its input state; it merges that state into
+      ``self.state``. The nested pass-through branch and all
+      ``loop_counter`` bookkeeping live in
+      ``MainLoop._process_state_frame``, not here.
+    * ``process_table(table, port)`` is provided by the generated subclass
+      and yields a downstream row via ``eval_output(...)`` against the
+      user's ``output`` expression.
+    * ``produce_state_on_finish(port)`` (final) emits the state crossing
+      the boundary to the matching LoopEnd: user variables plus the
+      pickled input table.
+
+    Subclass contract
+    -----------------
+    The generated subclass overrides ``open()`` and ``process_table()``
+    only. All other methods are ``@overrides.final``; do not override
+    them. After ``open()`` returns, ``self.state`` must be a dict
+    containing the user's loop variables (none of the reserved names in
+    ``_RESERVED_STATE_KEYS``).
+
+    Reserved names
+    --------------
+    * ``loop_counter`` / ``LoopStartId`` / ``LoopStartStateURI`` -- live on
+      the StateFrame envelope (``core.models.payload``), not in
+      ``self.state``. Stamped by this operator's worker via
+      ``MainLoop._compute_loop_start_id``.
+    * ``table`` / ``output`` -- transient names only available inside the
+      ``eval_output`` throwaway namespace; never persisted in
+      ``self.state``. See ``_RESERVED_STATE_KEYS``.
+    """
+
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        # First-entry only: merge upstream state into self.state. The nested
+        # pass-through (state already carrying LoopStartStateURI) and all
+        # loop_counter bookkeeping are owned by the worker runtime
+        # (main_loop._process_state_frame), so this operator never sees the
+        # counter and never mutates the State it is handed.
+        self.state.update(state)
+        return None
+
+    @overrides.final
+    def eval_output(self, output_expr: str, table: Table) -> TableLike:
+        # Run the user's `output` expression in a throwaway namespace seeded
+        # with the loop variables and the input `table`. This lets user code
+        # read `table` and define `output` without those reserved names leaking
+        # into -- or being silently clobbered out of -- the persistent loop
+        # state (self.state), addressing the exec-namespace collision.
+        namespace = {**self.state, "table": table}
+        exec("output = " + output_expr, {}, namespace)
+        return namespace["output"]
+
+    @overrides.final
+    def produce_state_on_finish(self, port: int) -> State:
+        from pickle import dumps
+
+        # Emit the user's loop variables plus the pickled input table for the
+        # matching LoopEnd. `table`/`output` are runtime-reserved and are not
+        # kept in self.state, so drop any stray ones before adding the real
+        # pickled table.
+        produced = {
+            key: value
+            for key, value in self.state.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+        produced["table"] = dumps(Table(self._TableOperator__table_data[port]))

Review Comment:
   **[resolved thread 19 — but not in the branch] → PR-D**
   
   Thread 19 was closed claiming a protected 
`TableOperator._buffered_table(port)` accessor plus an Arrow-IPC replacement 
for pickle (cited `e281c61b4c`). Neither is here: this line still reads the 
parent's private buffer via the name-mangled 
`self._TableOperator__table_data[port]`, and `pickle.dumps`/`loads` is still 
the table transport (see `loads` at operator.py:444). The name-mangling 
silently breaks if `TableOperator` is renamed; `pickle.loads` on state read 
back from iceberg is the RCE surface raised in the first review. Please re-open 
both.



##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,177 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+# Names the loop runtime owns inside the eval namespaces and across the loop
+# boundary; explicitly stripped from every state dict that crosses Loop
+# Start / Loop End so user code can neither read nor persist them.
+# Other reserved names that used to live in user state -- ``loop_counter``,
+# ``LoopStartId``, ``LoopStartStateURI`` -- are no longer in ``self.state``
+# at all; they ride the StateFrame envelope (see ``core.models.payload``)
+# and are stamped/captured by ``MainLoop._process_state_frame``.
+_RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
+
+
+class LoopStartOperator(TableOperator):
+    """Base class for the runtime side of a Loop Start operator.
+
+    The generator in ``LoopStartOpDesc.scala`` emits a thin
+    ``ProcessLoopStartOperator(LoopStartOperator)`` subclass that does
+    nothing more than wire the user-supplied ``initialization`` and
+    ``output`` expressions into ``open()`` and ``process_table()``; all
+    substantive logic lives here.
+
+    Lifecycle
+    ---------
+    * ``open()`` runs once when the worker starts. The generated subclass
+      executes the user's ``initialization`` against a fresh ``self.state``
+      dict; after it returns ``self.state`` holds *only* the user's loop
+      variables.
+    * ``process_state(state, port)`` (final) runs once when upstream sends
+      this LoopStart its input state; it merges that state into
+      ``self.state``. The nested pass-through branch and all
+      ``loop_counter`` bookkeeping live in
+      ``MainLoop._process_state_frame``, not here.
+    * ``process_table(table, port)`` is provided by the generated subclass
+      and yields a downstream row via ``eval_output(...)`` against the
+      user's ``output`` expression.
+    * ``produce_state_on_finish(port)`` (final) emits the state crossing
+      the boundary to the matching LoopEnd: user variables plus the
+      pickled input table.
+
+    Subclass contract
+    -----------------
+    The generated subclass overrides ``open()`` and ``process_table()``
+    only. All other methods are ``@overrides.final``; do not override
+    them. After ``open()`` returns, ``self.state`` must be a dict
+    containing the user's loop variables (none of the reserved names in
+    ``_RESERVED_STATE_KEYS``).
+
+    Reserved names
+    --------------
+    * ``loop_counter`` / ``LoopStartId`` / ``LoopStartStateURI`` -- live on
+      the StateFrame envelope (``core.models.payload``), not in
+      ``self.state``. Stamped by this operator's worker via
+      ``MainLoop._compute_loop_start_id``.
+    * ``table`` / ``output`` -- transient names only available inside the
+      ``eval_output`` throwaway namespace; never persisted in
+      ``self.state``. See ``_RESERVED_STATE_KEYS``.
+    """
+
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        # First-entry only: merge upstream state into self.state. The nested
+        # pass-through (state already carrying LoopStartStateURI) and all
+        # loop_counter bookkeeping are owned by the worker runtime
+        # (main_loop._process_state_frame), so this operator never sees the
+        # counter and never mutates the State it is handed.
+        self.state.update(state)
+        return None
+
+    @overrides.final
+    def eval_output(self, output_expr: str, table: Table) -> TableLike:
+        # Run the user's `output` expression in a throwaway namespace seeded
+        # with the loop variables and the input `table`. This lets user code
+        # read `table` and define `output` without those reserved names leaking
+        # into -- or being silently clobbered out of -- the persistent loop
+        # state (self.state), addressing the exec-namespace collision.
+        namespace = {**self.state, "table": table}
+        exec("output = " + output_expr, {}, namespace)
+        return namespace["output"]
+
+    @overrides.final
+    def produce_state_on_finish(self, port: int) -> State:
+        from pickle import dumps
+
+        # Emit the user's loop variables plus the pickled input table for the
+        # matching LoopEnd. `table`/`output` are runtime-reserved and are not
+        # kept in self.state, so drop any stray ones before adding the real
+        # pickled table.
+        produced = {
+            key: value
+            for key, value in self.state.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+        produced["table"] = dumps(Table(self._TableOperator__table_data[port]))
+        return produced
+
+
+class LoopEndOperator(TableOperator):
+    """Base class for the runtime side of a Loop End operator.
+
+    The generator in ``LoopEndOpDesc.scala`` emits a thin
+    ``ProcessLoopEndOperator(LoopEndOperator)`` subclass that wires the
+    user-supplied ``update`` expression into ``process_state(...)`` (via
+    ``run_update``) and the ``condition`` expression into ``condition()``
+    (via ``eval_condition``); all substantive logic lives here.
+
+    Lifecycle
+    ---------
+    * ``process_table(table, port)`` (final) yields each input table
+      through as-is.
+    * ``process_state(state, port)`` is provided by the generated
+      subclass. It calls ``run_update(update_code, state)`` to unpickle
+      the input table, run the user's ``update`` in a throwaway
+      namespace, stash the table on ``self._loop_table``, and persist
+      only user variables back into ``self.state``. Returns ``None``.
+    * ``condition()`` is the abstract method the generated subclass
+      implements by delegating to ``eval_condition(...)`` against the
+      user's ``condition`` expression. Called by ``MainLoop.complete()``
+      to decide whether to fire the back-edge via
+      ``_jump_to_loop_start``.
+
+    Subclass contract
+    -----------------
+    The generated subclass overrides ``process_state()`` (delegating to
+    ``run_update``) and ``condition()`` (delegating to
+    ``eval_condition``). All other methods are ``@overrides.final``; do
+    not override them.
+
+    Reserved names
+    --------------
+    Same as ``LoopStartOperator``: ``loop_counter`` / ``LoopStartId`` /
+    ``LoopStartStateURI`` live on the StateFrame envelope (never in user
+    state); ``table`` / ``output`` are transient names available only
+    inside ``run_update`` / ``eval_condition``'s throwaway namespace and
+    are stripped from ``self.state``. See ``_RESERVED_STATE_KEYS``.
+    """
+
+    @overrides.final
+    def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
+        yield table
+
+    @overrides.final
+    def run_update(self, update_code: str, state: State) -> None:
+        # Run the user's `update` in a throwaway namespace seeded with the
+        # incoming loop variables and the unpickled input table, then persist
+        # only the user variables back into self.state. `table`/`output` are
+        # runtime-reserved and never persist, so user code cannot silently
+        # clobber loop machinery through them. The real input table is kept on
+        # self._loop_table so condition() can read it after the update.
+        from pickle import loads
+
+        table = loads(state["table"])
+        namespace = {
+            key: value
+            for key, value in state.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+        namespace["table"] = table
+        exec(update_code, {}, namespace)
+        self._loop_table = table
+        self.state = {
+            key: value
+            for key, value in namespace.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+
+    @overrides.final
+    def eval_condition(self, condition_expr: str) -> bool:

Review Comment:
   **[new] → PR-D**
   
   `eval_condition` reads `self._loop_table` and `self.state`, which are 
created **only** inside `run_update` (operator.py:452-453). 
`MainLoop.complete()` calls `condition()` for any `LoopEndOperator`, gated only 
by `isinstance`. A LoopEnd that reaches completion without having consumed 
state (empty input, or an inner LoopEnd that only saw outer-loop pass-through 
frames) would `AttributeError` here. Please confirm this path is unreachable, 
or initialize `self.state` / `self._loop_table` in `__init__`.



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala:
##########
@@ -198,6 +198,7 @@ case class PhysicalOp(
     // schema propagation function
     propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => 
schemas),
     isOneToManyOp: Boolean = false,
+    isLoopEnd: Boolean = false,

Review Comment:
   **[resolved thread 22 — but not in the branch] → PR-C**
   
   Still `isLoopEnd` — named after the operator type, not the behavior the 
scheduler keys on ("this operator's output storage must survive a region 
re-run"). The reply's `reusesOutputStorageOnReExecution` rename isn't anywhere 
in the code or history. Please rename so the next operator needing the same 
behavior can reuse it without a misnomer.



##########
amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala:
##########
@@ -66,6 +67,23 @@ class WorkflowExecutionService(
 ) extends SubscriptionManager
     with LazyLogging {
 
+  // Loops require materialized edges to carry state between iterations.
+  // Previously we silently rewrote the user's execution mode to
+  // MATERIALIZED here, but that left the UI displaying the user's
+  // original (e.g. PIPELINED) choice while the engine ran something
+  // else -- the user had no way to tell the two had diverged. Fail
+  // loudly instead: surface a fatal error with an actionable message so
+  // the user can update the workflow setting and re-run.
+  if (
+    request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc])

Review Comment:
   **[resolved thread 11 — but not in the branch] → PR-D (logic), needs a test**
   
   Thread 11 was closed claiming a generic 
`LogicalOp.requiresMaterializedExecution` flag and a 7-case spec (cited 
`1848ce00fb`). Neither exists: line 78 still does 
`request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc])`, 
keeping the web service coupled to a concrete operator class, with no test for 
the rule. It also only checks `LoopStartOpDesc`, so a LoopEnd-only plan would 
slip through the materialized-mode validation. Please re-open.



##########
amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.e2e
+
+import com.twitter.util.{Await, Duration, Promise}
+import org.apache.pekko.actor.{ActorSystem, Props}
+import org.apache.pekko.testkit.{ImplicitSender, TestKit}
+import org.apache.pekko.util.Timeout
+import org.apache.texera.amber.clustering.SingleNodeListener
+import org.apache.texera.amber.core.workflow.{
+  ExecutionMode,
+  PortIdentity,
+  WorkflowContext,
+  WorkflowSettings
+}
+import org.apache.texera.amber.engine.architecture.controller._
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.client.AmberClient
+import org.apache.texera.amber.engine.e2e.TestUtils.{
+  buildWorkflow,
+  cleanupWorkflowExecutionData,
+  initiateTexeraDBForTestCases,
+  setUpWorkflowExecutionData
+}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
+import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
+import org.apache.texera.amber.tags.IntegrationTest
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.flatspec.AnyFlatSpecLike
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
+
+import scala.concurrent.duration.DurationInt
+
+/**
+  * End-to-end loop tests: run a real TextInput -> LoopStart -> LoopEnd 
workflow
+  * through the engine (controller + Python workers + the DCM back-jump and
+  * region re-execution) and assert it reaches COMPLETED. Termination is the
+  * assertion -- a loop whose counter / state hand-off were broken by the
+  * "loop_counter / LoopStartId / LoopStartStateURI off State" and guarded
+  * exec-namespace changes would hang and time out rather than complete.
+  *
+  * Tagged @IntegrationTest because it spawns Python workers; routed to the
+  * `amber-integration` CI job.
+  */
+@IntegrationTest
+class LoopIntegrationSpec
+    extends TestKit(ActorSystem("LoopIntegrationSpec", 
AmberRuntime.pekkoConfig))
+    with ImplicitSender
+    with AnyFlatSpecLike
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Retries {
+
+  override def withFixture(test: NoArgTest): Outcome =
+    withRetry { super.withFixture(test) }
+
+  implicit val timeout: Timeout = Timeout(5.seconds)
+
+  override protected def beforeEach(): Unit = setUpWorkflowExecutionData()
+
+  override protected def afterEach(): Unit = cleanupWorkflowExecutionData()
+
+  override def beforeAll(): Unit = {
+    system.actorOf(Props[SingleNodeListener](), "cluster-info")
+    Class.forName("org.postgresql.Driver")
+    initiateTexeraDBForTestCases()
+  }
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  // Loops require MATERIALIZED execution mode (the cross-region state channel
+  // is the loop back-edge).
+  private def materializedContext(): WorkflowContext =
+    new WorkflowContext(
+      workflowSettings = WorkflowSettings(
+        dataTransferBatchSize = 400,
+        executionMode = ExecutionMode.MATERIALIZED
+      )
+    )
+
+  private def runToCompletion(
+      operators: List[LogicalOp],
+      links: List[LogicalLink]
+  ): Unit = {
+    val workflow = buildWorkflow(operators, links, materializedContext())
+    val client = new AmberClient(
+      system,
+      workflow.context,
+      workflow.physicalPlan,
+      ControllerConfig.default,
+      _ => {}
+    )
+    val completion = Promise[Unit]()
+    client.registerCallback[FatalError](evt => {
+      completion.setException(evt.e)
+      client.shutdown()
+    })
+    client.registerCallback[ExecutionStateUpdate](evt => {
+      if (evt.state == COMPLETED) completion.setDone()
+    })
+    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    // A correct loop terminates; a broken one hangs until this deadline.
+    Await.result(completion, Duration.fromMinutes(3))
+    client.shutdown()
+  }
+
+  private def textInput(text: String): TextInputSourceOpDesc = {
+    val op = new TextInputSourceOpDesc()
+    op.textInput = text
+    op
+  }
+
+  private def loopStart(initialization: String, output: String): 
LoopStartOpDesc = {
+    val op = new LoopStartOpDesc()
+    op.initialization = initialization
+    op.output = output
+    op
+  }
+
+  private def loopEnd(update: String, condition: String): LoopEndOpDesc = {
+    val op = new LoopEndOpDesc()
+    op.update = update
+    op.condition = condition
+    op
+  }
+
+  private def link(from: LogicalOp, to: LogicalOp): LogicalLink =
+    LogicalLink(from.operatorIdentifier, PortIdentity(), 
to.operatorIdentifier, PortIdentity())
+
+  "Engine" should "run a single TextInput -> LoopStart -> LoopEnd loop to 
completion" in {
+    val src = textInput("1\n2\n3")
+    val start = loopStart("i = 0", "table.iloc[i]")
+    val end = loopEnd("i += 1", "i < len(table)")
+    runToCompletion(
+      List(src, start, end),
+      List(link(src, start), link(start, end))
+    )
+    succeed // reaching COMPLETED within the deadline is the assertion

Review Comment:
   **[new — test gap] → PR-D**
   
   Both e2e cases assert only that the workflow reaches `COMPLETED`, then 
`succeed` — no result table, row count, or iteration count is ever read. For a 
loop, the iteration count **is** the core correctness property: a 
routing/counter off-by-one that still terminates would pass this test. The PR 
description's "3 iterations" / "9 nested" numbers are exactly what should be 
asserted. Please read the sink output and check the counts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to