Xiao-zhen-Liu commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3400851469


##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,20 +211,42 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:

Review Comment:
   **Resolved thread 17, but the fix isn't in the branch.** The reply described 
a rename to `reset_output_storage`, a docstring, and guards (commit 
`e6bea518f2`), but the method here is still `reset_storage`, undocumented, with 
no checks. The reason truncation is safe (downstream is paused while in 
MATERIALIZED mode) lives only in the PR description. Please re-open, or point 
me at the commit if it was lost.



##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,20 +211,42 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:
+        port_id = self.get_port_ids()[0]
+        storage_uri_base = self._storage_uri_base
+        self.close_port_storage_writers()
+        DocumentFactory.create_document(

Review Comment:
   **Possible data loss.** This recreates both the result and state tables 
every iteration (`override_if_exists=True`), but 
`RegionExecutionCoordinator.scala:579-589` deliberately does *not* recreate 
LoopEnd's documents on a re-run — its comment says recreating them "would erase 
what we just wrote." The two paths look contradictory; how is the accumulated 
output not erased? (Also: the description says this only runs for the inner 
LoopEnd of a nested loop, but `main_loop.py:267` calls it for any LoopEnd with 
output state.)



##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,177 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+# Names the loop runtime owns inside the eval namespaces and across the loop
+# boundary; explicitly stripped from every state dict that crosses Loop
+# Start / Loop End so user code can neither read nor persist them.
+# Other reserved names that used to live in user state -- ``loop_counter``,
+# ``LoopStartId``, ``LoopStartStateURI`` -- are no longer in ``self.state``
+# at all; they ride the StateFrame envelope (see ``core.models.payload``)
+# and are stamped/captured by ``MainLoop._process_state_frame``.
+_RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
+
+
+class LoopStartOperator(TableOperator):
+    """Base class for the runtime side of a Loop Start operator.
+
+    The generator in ``LoopStartOpDesc.scala`` emits a thin
+    ``ProcessLoopStartOperator(LoopStartOperator)`` subclass that does
+    nothing more than wire the user-supplied ``initialization`` and
+    ``output`` expressions into ``open()`` and ``process_table()``; all
+    substantive logic lives here.
+
+    Lifecycle
+    ---------
+    * ``open()`` runs once when the worker starts. The generated subclass
+      executes the user's ``initialization`` against a fresh ``self.state``
+      dict; after it returns ``self.state`` holds *only* the user's loop
+      variables.
+    * ``process_state(state, port)`` (final) runs once when upstream sends
+      this LoopStart its input state; it merges that state into
+      ``self.state``. The nested pass-through branch and all
+      ``loop_counter`` bookkeeping live in
+      ``MainLoop._process_state_frame``, not here.
+    * ``process_table(table, port)`` is provided by the generated subclass
+      and yields a downstream row via ``eval_output(...)`` against the
+      user's ``output`` expression.
+    * ``produce_state_on_finish(port)`` (final) emits the state crossing
+      the boundary to the matching LoopEnd: user variables plus the
+      pickled input table.
+
+    Subclass contract
+    -----------------
+    The generated subclass overrides ``open()`` and ``process_table()``
+    only. All other methods are ``@overrides.final``; do not override
+    them. After ``open()`` returns, ``self.state`` must be a dict
+    containing the user's loop variables (none of the reserved names in
+    ``_RESERVED_STATE_KEYS``).
+
+    Reserved names
+    --------------
+    * ``loop_counter`` / ``LoopStartId`` / ``LoopStartStateURI`` -- live on
+      the StateFrame envelope (``core.models.payload``), not in
+      ``self.state``. Stamped by this operator's worker via
+      ``MainLoop._compute_loop_start_id``.
+    * ``table`` / ``output`` -- transient names only available inside the
+      ``eval_output`` throwaway namespace; never persisted in
+      ``self.state``. See ``_RESERVED_STATE_KEYS``.
+    """
+
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        # First-entry only: merge upstream state into self.state. The nested
+        # pass-through (state already carrying LoopStartStateURI) and all
+        # loop_counter bookkeeping are owned by the worker runtime
+        # (main_loop._process_state_frame), so this operator never sees the
+        # counter and never mutates the State it is handed.
+        self.state.update(state)
+        return None
+
+    @overrides.final
+    def eval_output(self, output_expr: str, table: Table) -> TableLike:
+        # Run the user's `output` expression in a throwaway namespace seeded
+        # with the loop variables and the input `table`. This lets user code
+        # read `table` and define `output` without those reserved names leaking
+        # into -- or being silently clobbered out of -- the persistent loop
+        # state (self.state), addressing the exec-namespace collision.
+        namespace = {**self.state, "table": table}
+        exec("output = " + output_expr, {}, namespace)
+        return namespace["output"]
+
+    @overrides.final
+    def produce_state_on_finish(self, port: int) -> State:
+        from pickle import dumps
+
+        # Emit the user's loop variables plus the pickled input table for the
+        # matching LoopEnd. `table`/`output` are runtime-reserved and are not
+        # kept in self.state, so drop any stray ones before adding the real
+        # pickled table.
+        produced = {
+            key: value
+            for key, value in self.state.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+        produced["table"] = dumps(Table(self._TableOperator__table_data[port]))

Review Comment:
   **Resolved thread 19, but the fix isn't in the branch.** The reply described 
a `TableOperator._buffered_table(port)` accessor and an Arrow replacement for 
pickle (commit `e281c61b4c`); neither is here. This still reads the parent's 
private field through the mangled name `self._TableOperator__table_data[port]` 
(which breaks silently if `TableOperator` is renamed), and the table is still 
moved with `pickle` (`loads` at line 444) — the remote-code-execution risk from 
the first review. Please re-open both.



##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,177 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+# Names the loop runtime owns inside the eval namespaces and across the loop
+# boundary; explicitly stripped from every state dict that crosses Loop
+# Start / Loop End so user code can neither read nor persist them.
+# Other reserved names that used to live in user state -- ``loop_counter``,
+# ``LoopStartId``, ``LoopStartStateURI`` -- are no longer in ``self.state``
+# at all; they ride the StateFrame envelope (see ``core.models.payload``)
+# and are stamped/captured by ``MainLoop._process_state_frame``.
+_RESERVED_STATE_KEYS: frozenset = frozenset({"table", "output"})
+
+
+class LoopStartOperator(TableOperator):
+    """Base class for the runtime side of a Loop Start operator.
+
+    The generator in ``LoopStartOpDesc.scala`` emits a thin
+    ``ProcessLoopStartOperator(LoopStartOperator)`` subclass that does
+    nothing more than wire the user-supplied ``initialization`` and
+    ``output`` expressions into ``open()`` and ``process_table()``; all
+    substantive logic lives here.
+
+    Lifecycle
+    ---------
+    * ``open()`` runs once when the worker starts. The generated subclass
+      executes the user's ``initialization`` against a fresh ``self.state``
+      dict; after it returns ``self.state`` holds *only* the user's loop
+      variables.
+    * ``process_state(state, port)`` (final) runs once when upstream sends
+      this LoopStart its input state; it merges that state into
+      ``self.state``. The nested pass-through branch and all
+      ``loop_counter`` bookkeeping live in
+      ``MainLoop._process_state_frame``, not here.
+    * ``process_table(table, port)`` is provided by the generated subclass
+      and yields a downstream row via ``eval_output(...)`` against the
+      user's ``output`` expression.
+    * ``produce_state_on_finish(port)`` (final) emits the state crossing
+      the boundary to the matching LoopEnd: user variables plus the
+      pickled input table.
+
+    Subclass contract
+    -----------------
+    The generated subclass overrides ``open()`` and ``process_table()``
+    only. All other methods are ``@overrides.final``; do not override
+    them. After ``open()`` returns, ``self.state`` must be a dict
+    containing the user's loop variables (none of the reserved names in
+    ``_RESERVED_STATE_KEYS``).
+
+    Reserved names
+    --------------
+    * ``loop_counter`` / ``LoopStartId`` / ``LoopStartStateURI`` -- live on
+      the StateFrame envelope (``core.models.payload``), not in
+      ``self.state``. Stamped by this operator's worker via
+      ``MainLoop._compute_loop_start_id``.
+    * ``table`` / ``output`` -- transient names only available inside the
+      ``eval_output`` throwaway namespace; never persisted in
+      ``self.state``. See ``_RESERVED_STATE_KEYS``.
+    """
+
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        # First-entry only: merge upstream state into self.state. The nested
+        # pass-through (state already carrying LoopStartStateURI) and all
+        # loop_counter bookkeeping are owned by the worker runtime
+        # (main_loop._process_state_frame), so this operator never sees the
+        # counter and never mutates the State it is handed.
+        self.state.update(state)
+        return None
+
+    @overrides.final
+    def eval_output(self, output_expr: str, table: Table) -> TableLike:
+        # Run the user's `output` expression in a throwaway namespace seeded
+        # with the loop variables and the input `table`. This lets user code
+        # read `table` and define `output` without those reserved names leaking
+        # into -- or being silently clobbered out of -- the persistent loop
+        # state (self.state), addressing the exec-namespace collision.
+        namespace = {**self.state, "table": table}
+        exec("output = " + output_expr, {}, namespace)
+        return namespace["output"]
+
+    @overrides.final
+    def produce_state_on_finish(self, port: int) -> State:
+        from pickle import dumps
+
+        # Emit the user's loop variables plus the pickled input table for the
+        # matching LoopEnd. `table`/`output` are runtime-reserved and are not
+        # kept in self.state, so drop any stray ones before adding the real
+        # pickled table.
+        produced = {
+            key: value
+            for key, value in self.state.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+        produced["table"] = dumps(Table(self._TableOperator__table_data[port]))
+        return produced
+
+
+class LoopEndOperator(TableOperator):
+    """Base class for the runtime side of a Loop End operator.
+
+    The generator in ``LoopEndOpDesc.scala`` emits a thin
+    ``ProcessLoopEndOperator(LoopEndOperator)`` subclass that wires the
+    user-supplied ``update`` expression into ``process_state(...)`` (via
+    ``run_update``) and the ``condition`` expression into ``condition()``
+    (via ``eval_condition``); all substantive logic lives here.
+
+    Lifecycle
+    ---------
+    * ``process_table(table, port)`` (final) yields each input table
+      through as-is.
+    * ``process_state(state, port)`` is provided by the generated
+      subclass. It calls ``run_update(update_code, state)`` to unpickle
+      the input table, run the user's ``update`` in a throwaway
+      namespace, stash the table on ``self._loop_table``, and persist
+      only user variables back into ``self.state``. Returns ``None``.
+    * ``condition()`` is the abstract method the generated subclass
+      implements by delegating to ``eval_condition(...)`` against the
+      user's ``condition`` expression. Called by ``MainLoop.complete()``
+      to decide whether to fire the back-edge via
+      ``_jump_to_loop_start``.
+
+    Subclass contract
+    -----------------
+    The generated subclass overrides ``process_state()`` (delegating to
+    ``run_update``) and ``condition()`` (delegating to
+    ``eval_condition``). All other methods are ``@overrides.final``; do
+    not override them.
+
+    Reserved names
+    --------------
+    Same as ``LoopStartOperator``: ``loop_counter`` / ``LoopStartId`` /
+    ``LoopStartStateURI`` live on the StateFrame envelope (never in user
+    state); ``table`` / ``output`` are transient names available only
+    inside ``run_update`` / ``eval_condition``'s throwaway namespace and
+    are stripped from ``self.state``. See ``_RESERVED_STATE_KEYS``.
+    """
+
+    @overrides.final
+    def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
+        yield table
+
+    @overrides.final
+    def run_update(self, update_code: str, state: State) -> None:
+        # Run the user's `update` in a throwaway namespace seeded with the
+        # incoming loop variables and the unpickled input table, then persist
+        # only the user variables back into self.state. `table`/`output` are
+        # runtime-reserved and never persist, so user code cannot silently
+        # clobber loop machinery through them. The real input table is kept on
+        # self._loop_table so condition() can read it after the update.
+        from pickle import loads
+
+        table = loads(state["table"])
+        namespace = {
+            key: value
+            for key, value in state.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+        namespace["table"] = table
+        exec(update_code, {}, namespace)
+        self._loop_table = table
+        self.state = {
+            key: value
+            for key, value in namespace.items()
+            if key not in _RESERVED_STATE_KEYS
+        }
+
+    @overrides.final
+    def eval_condition(self, condition_expr: str) -> bool:

Review Comment:
   `eval_condition` reads `self._loop_table` and `self.state`, which are only 
created in `run_update`. `complete()` calls `condition()` for any LoopEnd. If a 
LoopEnd finishes without having consumed state (empty input, or an inner 
LoopEnd that only passed outer-loop state through), this raises 
`AttributeError`. Please confirm that can't happen, or initialize both in 
`__init__`.



##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +101,69 @@ def __init__(
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
+    def _compute_loop_start_id(self) -> typing.Tuple[str, str]:
+        # A LoopStart stamps its own operator id and the iceberg URI its input
+        # is read from onto the state it emits; the matching LoopEnd reads them
+        # back to jump. These ride the StateFrame envelope, not user state.
+        loop_start_id = get_operator_id(self.context.worker_id)
+        # The URI lives on the upstream operator's output port (which
+        # LoopStart's first materialization reader is reading from). LoopStart
+        # is constrained to a single input port + single reader, so fail loud
+        # rather than silently picking whichever dict iterator yields first --
+        # that would mask future graph mistakes by choosing an arbitrary URI.
+        reader_runnables = (
+            self.context.input_manager.get_input_port_mat_reader_threads()
+        )
+        if len(reader_runnables) != 1:
+            raise RuntimeError(
+                f"LoopStart expected exactly one input port, "
+                f"got {len(reader_runnables)}"
+            )
+        [(_, readers)] = reader_runnables.items()
+        if len(readers) != 1:
+            raise RuntimeError(
+                f"LoopStart expected exactly one input reader on its port, "
+                f"got {len(readers)}"
+            )
+        loop_start_state_uri = VFSURIFactory.state_uri(readers[0].uri)
+        return loop_start_id, loop_start_state_uri
+
+    def _jump_to_loop_start(
+        self, executor: LoopEndOperator, controller_interface
+    ) -> None:
+        state = executor.state
+        controller_interface.jump_to_operator_region(
+            JumpToOperatorRegionRequest(OperatorIdentity(self._loop_start_id))
+        )
+        uri = self._loop_start_state_uri
+        # Strip the per-iteration scratch (`table`, `output`) so only the
+        # user-visible loop state is written back to LoopStart's input. The
+        # loop metadata (counter, LoopStartId, LoopStartStateURI) is owned by
+        # the runtime and never lived in this dict.
+        for key in ("table", "output"):
+            state.pop(key, None)
+        writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
+        # The back-edge fires only after the matching LoopEnd consumed at
+        # loop_counter == 0, so the next iteration's input starts at depth 0.
+        writer.put_one(State(state).to_tuple(0))
+        writer.close()
+
     def complete(self) -> None:
         """
         Complete the DataProcessor, marking state to COMPLETED, and notify the
         controller.
         """
         # flush the buffered console prints
         self._check_and_report_console_messages(force_flush=True)
-        self.context.executor_manager.executor.close()
+        controller_interface = self._async_rpc_client.controller_stub()
+        executor = self.context.executor_manager.executor
+        if isinstance(executor, LoopEndOperator) and executor.condition():

Review Comment:
   On thread 16, the "same as a Python UDF" reply holds for `update` / 
`initialization` / `output`, but not for `condition()`: it runs here on the 
main loop thread inside `complete()`, before `close()` and the COMPLETED 
transition — not on the guarded path a UDF error would take. A typo in 
`condition` fails the worker thread instead of being reported. Worth guarding 
this one call, or noting the difference.



##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -189,20 +253,52 @@ def process_input_tuple(self) -> None:
                     output_tuple
                 )
 
-    def process_input_state(self) -> None:
+    def process_input_state(
+        self,
+        output_loop_counter: int = 0,
+        output_loop_start_id: str = "",
+        output_loop_start_state_uri: str = "",
+    ) -> None:
         self._switch_context()
         output_state = self.context.state_processing_manager.get_output_state()
         if output_state is not None:
-            for to, batch in 
self.context.output_manager.emit_state(output_state):
-                self._output_queue.put(
-                    DataElement(
-                        tag=ChannelIdentity(
-                            ActorVirtualIdentity(self.context.worker_id), to, 
False
-                        ),
-                        payload=batch,
-                    )
+            executor = self.context.executor_manager.executor
+            if isinstance(executor, LoopEndOperator):
+                self.context.output_manager.reset_storage()

Review Comment:
   `reset_storage()` runs for any LoopEnd with output state here, regardless of 
`loop_counter` — but the description says it only fires for the inner LoopEnd 
of a nested loop. See the note on `output_manager.py:241`.



##########
amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala:
##########
@@ -66,6 +67,23 @@ class WorkflowExecutionService(
 ) extends SubscriptionManager
     with LazyLogging {
 
+  // Loops require materialized edges to carry state between iterations.
+  // Previously we silently rewrote the user's execution mode to
+  // MATERIALIZED here, but that left the UI displaying the user's
+  // original (e.g. PIPELINED) choice while the engine ran something
+  // else -- the user had no way to tell the two had diverged. Fail
+  // loudly instead: surface a fatal error with an actionable message so
+  // the user can update the workflow setting and re-run.
+  if (
+    request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc])

Review Comment:
   **Resolved thread 11, but the fix isn't in the branch.** The reply described 
a generic `LogicalOp.requiresMaterializedExecution` flag and a test (commit 
`1848ce00fb`); neither exists. Line 78 still uses 
`isInstanceOf[LoopStartOpDesc]`, tying this service to one operator class, with 
no test. It also only checks LoopStart, so a plan with only a LoopEnd would 
skip the check. Please re-open.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -576,8 +576,17 @@ class RegionExecutionCoordinator(
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(resultURI, schema)
-        DocumentFactory.createDocument(stateURI, State.schema)
+        // LoopEnd operators may re-execute the region multiple times; on
+        // subsequent iterations the result/state documents already exist,
+        // and `createDocument` (overrideIfExists=true) would clobber them.
+        // Skip the create call when the document is already there.
+        val isLoopEndRegion = region.getOperators.exists(_.isLoopEnd)

Review Comment:
   **Resolved threads 21 and 22, but the fixes aren't in the branch.** The 
replies cited commits (`ca9e5ce8cc`, `540b7ba274`, `bbec98282e`) that aren't 
here. The flag is still the type-named `isLoopEnd`, and this skip-create branch 
is still untested. Please rename it to describe the behavior, and add a test 
that pre-creates the documents and checks they're reused, not recreated.



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala:
##########
@@ -198,6 +198,7 @@ case class PhysicalOp(
     // schema propagation function
     propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => 
schemas),
     isOneToManyOp: Boolean = false,
+    isLoopEnd: Boolean = false,

Review Comment:
   **Resolved thread 22, but the rename isn't in the branch.** Still 
`isLoopEnd` — named after the operator, not after the behavior the scheduler 
checks ("keep this operator's output across a region re-run"). Renaming it lets 
the next operator that needs the same behavior reuse it without a misleading 
name.



##########
amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.engine.e2e
+
+import com.twitter.util.{Await, Duration, Promise}
+import org.apache.pekko.actor.{ActorSystem, Props}
+import org.apache.pekko.testkit.{ImplicitSender, TestKit}
+import org.apache.pekko.util.Timeout
+import org.apache.texera.amber.clustering.SingleNodeListener
+import org.apache.texera.amber.core.workflow.{
+  ExecutionMode,
+  PortIdentity,
+  WorkflowContext,
+  WorkflowSettings
+}
+import org.apache.texera.amber.engine.architecture.controller._
+import 
org.apache.texera.amber.engine.architecture.rpc.controlcommands.EmptyRequest
+import 
org.apache.texera.amber.engine.architecture.rpc.controlreturns.WorkflowAggregatedState.COMPLETED
+import org.apache.texera.amber.engine.common.AmberRuntime
+import org.apache.texera.amber.engine.common.client.AmberClient
+import org.apache.texera.amber.engine.e2e.TestUtils.{
+  buildWorkflow,
+  cleanupWorkflowExecutionData,
+  initiateTexeraDBForTestCases,
+  setUpWorkflowExecutionData
+}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.loop.{LoopEndOpDesc, LoopStartOpDesc}
+import org.apache.texera.amber.operator.source.scan.text.TextInputSourceOpDesc
+import org.apache.texera.amber.tags.IntegrationTest
+import org.apache.texera.workflow.LogicalLink
+import org.scalatest.flatspec.AnyFlatSpecLike
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, Outcome, Retries}
+
+import scala.concurrent.duration.DurationInt
+
+/**
+  * End-to-end loop tests: run a real TextInput -> LoopStart -> LoopEnd 
workflow
+  * through the engine (controller + Python workers + the DCM back-jump and
+  * region re-execution) and assert it reaches COMPLETED. Termination is the
+  * assertion -- a loop whose counter / state hand-off were broken by the
+  * "loop_counter / LoopStartId / LoopStartStateURI off State" and guarded
+  * exec-namespace changes would hang and time out rather than complete.
+  *
+  * Tagged @IntegrationTest because it spawns Python workers; routed to the
+  * `amber-integration` CI job.
+  */
+@IntegrationTest
+class LoopIntegrationSpec
+    extends TestKit(ActorSystem("LoopIntegrationSpec", 
AmberRuntime.pekkoConfig))
+    with ImplicitSender
+    with AnyFlatSpecLike
+    with BeforeAndAfterAll
+    with BeforeAndAfterEach
+    with Retries {
+
+  override def withFixture(test: NoArgTest): Outcome =
+    withRetry { super.withFixture(test) }
+
+  implicit val timeout: Timeout = Timeout(5.seconds)
+
+  override protected def beforeEach(): Unit = setUpWorkflowExecutionData()
+
+  override protected def afterEach(): Unit = cleanupWorkflowExecutionData()
+
+  override def beforeAll(): Unit = {
+    system.actorOf(Props[SingleNodeListener](), "cluster-info")
+    Class.forName("org.postgresql.Driver")
+    initiateTexeraDBForTestCases()
+  }
+
+  override def afterAll(): Unit = {
+    TestKit.shutdownActorSystem(system)
+  }
+
+  // Loops require MATERIALIZED execution mode (the cross-region state channel
+  // is the loop back-edge).
+  private def materializedContext(): WorkflowContext =
+    new WorkflowContext(
+      workflowSettings = WorkflowSettings(
+        dataTransferBatchSize = 400,
+        executionMode = ExecutionMode.MATERIALIZED
+      )
+    )
+
+  private def runToCompletion(
+      operators: List[LogicalOp],
+      links: List[LogicalLink]
+  ): Unit = {
+    val workflow = buildWorkflow(operators, links, materializedContext())
+    val client = new AmberClient(
+      system,
+      workflow.context,
+      workflow.physicalPlan,
+      ControllerConfig.default,
+      _ => {}
+    )
+    val completion = Promise[Unit]()
+    client.registerCallback[FatalError](evt => {
+      completion.setException(evt.e)
+      client.shutdown()
+    })
+    client.registerCallback[ExecutionStateUpdate](evt => {
+      if (evt.state == COMPLETED) completion.setDone()
+    })
+    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    // A correct loop terminates; a broken one hangs until this deadline.
+    Await.result(completion, Duration.fromMinutes(3))
+    client.shutdown()
+  }
+
+  private def textInput(text: String): TextInputSourceOpDesc = {
+    val op = new TextInputSourceOpDesc()
+    op.textInput = text
+    op
+  }
+
+  private def loopStart(initialization: String, output: String): 
LoopStartOpDesc = {
+    val op = new LoopStartOpDesc()
+    op.initialization = initialization
+    op.output = output
+    op
+  }
+
+  private def loopEnd(update: String, condition: String): LoopEndOpDesc = {
+    val op = new LoopEndOpDesc()
+    op.update = update
+    op.condition = condition
+    op
+  }
+
+  private def link(from: LogicalOp, to: LogicalOp): LogicalLink =
+    LogicalLink(from.operatorIdentifier, PortIdentity(), 
to.operatorIdentifier, PortIdentity())
+
+  "Engine" should "run a single TextInput -> LoopStart -> LoopEnd loop to 
completion" in {
+    val src = textInput("1\n2\n3")
+    val start = loopStart("i = 0", "table.iloc[i]")
+    val end = loopEnd("i += 1", "i < len(table)")
+    runToCompletion(
+      List(src, start, end),
+      List(link(src, start), link(start, end))
+    )
+    succeed // reaching COMPLETED within the deadline is the assertion

Review Comment:
   Both cases only check that the workflow reaches COMPLETED, then `succeed` — 
they never read the output or the iteration count. For a loop, the iteration 
count is the main thing to verify: a counter bug that still terminates would 
pass this test. The description's "3 iterations" / "9 nested" are what to 
assert here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to