Copilot commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3262435359


##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, 
PhysicalOp}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+
+class LoopStartOpDesc extends LogicalOp {
+  @JsonProperty(required = true, defaultValue = "i = 0")
+  @JsonSchemaTitle("Initialization")
+  var initialization: String = _
+
+  @JsonProperty(required = true, defaultValue = "table.iloc[i]")
+  @JsonSchemaTitle("Output")
+  var output: String = _
+
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ): PhysicalOp =
+    PhysicalOp
+      .oneToOnePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithCode(generatePythonCode(), "python")
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withSuggestedWorkerNum(1)
+      .withParallelizable(false)
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      "Loop Start",
+      "Begin a loop that iterates over rows of the input table; pairs with 
Loop End.",
+      OperatorGroupConstants.CONTROL_GROUP,
+      inputPorts = List(InputPort()),
+      outputPorts = List(OutputPort())
+    )
+
+  def generatePythonCode(): String = {
+    s"""
+       |from pytexera import *
+       |class ProcessLoopStartOperator(LoopStartOperator):
+       |    @overrides
+       |    def open(self):
+       |        self.state = {"loop_counter": 0}
+       |        exec("$initialization", {}, self.state)
+       |
+       |    @overrides
+       |    def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]]:
+       |        self.state["table"] = table
+       |        exec("output = $output", {}, self.state)

Review Comment:
   User-supplied expressions are interpolated raw into a double-quoted Python 
`exec(...)` string. Any double quote, backslash, or newline in `initialization` 
or `output` will produce a Python SyntaxError at operator-build time and is a 
code-injection vector (e.g., an `output` containing `"); import os; 
os.system("...` would escape the exec call entirely). Consider injecting these 
expressions as proper indented Python source rather than as `exec` string 
literals, or at minimum escape the value (e.g., wrap in triple-quotes and 
reject embedded triple-quotes).
   



##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity, 
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort, 
PhysicalOp}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants, 
OperatorInfo}
+
+class LoopEndOpDesc extends LogicalOp {
+  @JsonProperty(required = true, defaultValue = "i += 1")
+  @JsonSchemaTitle("Update")
+  var update: String = _
+
+  @JsonProperty(required = true, defaultValue = "i < len(table)")
+  @JsonSchemaTitle("Condition")
+  var condition: String = _
+
+  override def getPhysicalOp(
+      workflowId: WorkflowIdentity,
+      executionId: ExecutionIdentity
+  ): PhysicalOp =
+    PhysicalOp
+      .oneToOnePhysicalOp(
+        workflowId,
+        executionId,
+        operatorIdentifier,
+        OpExecWithCode(generatePythonCode(), "python")
+      )
+      .withInputPorts(operatorInfo.inputPorts)
+      .withOutputPorts(operatorInfo.outputPorts)
+      .withSuggestedWorkerNum(1)
+      .withParallelizable(false)
+      .withIsLoopEnd(true)
+
+  override def operatorInfo: OperatorInfo =
+    OperatorInfo(
+      "Loop End",
+      "Close a loop body and decide whether to iterate again based on a 
condition; pairs with Loop Start.",
+      OperatorGroupConstants.CONTROL_GROUP,
+      inputPorts = List(InputPort()),
+      outputPorts = List(OutputPort())
+    )
+
+  def generatePythonCode(): String = {
+    s"""
+       |from pytexera import *
+       |class ProcessLoopEndOperator(LoopEndOperator):
+       |    @overrides
+       |    def process_state(self, state: State, port: int) -> 
Optional[State]:
+       |      loop_counter = int(state.get("loop_counter", 0))
+       |      if loop_counter > 0:
+       |        state["loop_counter"] = loop_counter - 1
+       |        return state
+       |      self.state = dict(state)
+       |      from pickle import loads
+       |      self.state["table"] = loads(self.state["table"])
+       |      exec("$update", {}, self.state)
+       |      return None
+       |
+       |    @overrides
+       |    def condition(self) -> bool:
+       |      exec("output = $condition", {}, self.state)

Review Comment:
   Same problem as `LoopStartOpDesc`: `update` and `condition` are interpolated 
directly into double-quoted `exec(...)` literals. Any expression containing 
`"`, `\`, or a newline will produce invalid Python, and the values are 
effectively executable code-injection sites. Inline them as plain Python 
statements (e.g., on their own indented line) or escape them defensively.



##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
+    def _attach_loop_start_id(self, output_state: State) -> None:
+        if "LoopStartId" in output_state:
+            return
+        output_state["LoopStartId"] = self.context.worker_id.split("-", 
1)[1].rsplit(
+            "-main-0", 1
+        )[0]

Review Comment:
   Deriving the LoopStart operator id from `worker_id` by string-splitting on 
`-` and `-main-0` is brittle: it silently assumes the worker name ends with 
`-main-0` and that the operator id itself never contains the literal `-main-0`. 
`SpecialPhysicalOpFactory` and other code paths already produce layer names 
containing underscores/hyphens, so a future renaming of the layer suffix or 
worker index will break this without any error. Prefer using the operator 
identity already available from the worker's context (e.g., the parsed 
`ActorVirtualIdentity` / physical-op id) rather than re-parsing the worker id 
string.
   



##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+class LoopStartOperator(TableOperator):
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        if "LoopStartStateURI" in state:
+            state["loop_counter"] += 1

Review Comment:
   `LoopStartOperator.process_state` does `state["loop_counter"] += 1` when the 
incoming state is from an outer scope. This silently assumes the incoming state 
already contains a `loop_counter` key, which is only true if the outer scope 
was also produced by a `LoopStart` whose `open()` initialized it. Any non-loop 
upstream that supplies a state carrying `LoopStartStateURI` (or an outer 
LoopStart whose user `initialization` happens not to define `loop_counter`) 
will raise `KeyError` at runtime. Either default-initialize via 
`state.get("loop_counter", 0) + 1` or assert the precondition with a clear 
error.
   



##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
+    def _attach_loop_start_id(self, output_state: State) -> None:
+        if "LoopStartId" in output_state:
+            return
+        output_state["LoopStartId"] = self.context.worker_id.split("-", 
1)[1].rsplit(
+            "-main-0", 1
+        )[0]
+        # The URI lives on the upstream operator's output port (which
+        # LoopStart's first materialization reader is reading from).
+        reader_runnables = (
+            self.context.input_manager.get_input_port_mat_reader_threads()
+        )
+        output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+            next(iter(reader_runnables.values()))[0].uri
+        )
+
+    def _jump_to_loop_start(
+        self, executor: LoopEndOperator, controller_interface
+    ) -> None:
+        state = executor.state
+        controller_interface.jump_to_operator_region(
+            JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+        )
+        uri = state["LoopStartStateURI"]
+        # Strip the per-iteration scratch (`table`, `output`) and the
+        # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
+        # user-visible loop state is written back to LoopStart's input.
+        for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+            state.pop(key, None)
+        writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
+        writer.put_one(State(state).to_tuple())
+        writer.close()
+
     def complete(self) -> None:
         """
         Complete the DataProcessor, marking state to COMPLETED, and notify the
         controller.
         """
         # flush the buffered console prints
         self._check_and_report_console_messages(force_flush=True)
-        self.context.executor_manager.executor.close()
+        controller_interface = self._async_rpc_client.controller_stub()
+        executor = self.context.executor_manager.executor
+        if isinstance(executor, LoopEndOperator) and executor.condition():
+            self._jump_to_loop_start(executor, controller_interface)

Review Comment:
   In `complete()`, `executor.condition()` is invoked for every 
`LoopEndOperator` worker, including nested LoopEnds whose `process_state` only 
saw the pass-through branch (`loop_counter > 0`) and therefore never executed 
`self.state = dict(state)`. In that case `self.state` either does not exist or 
is stale from a previous iteration, so `condition()` may raise 
`AttributeError`/`KeyError` or — worse — return a stale `True` and fire an 
unintended `jump_to_operator_region` writing garbage state back to LoopStart. 
Gate the `condition()`/jump on having actually absorbed a terminal state this 
iteration (e.g., remember whether `process_state` took the `loop_counter == 0` 
branch).



##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
             target=self.data_processor.run, daemon=True, 
name="data_processor_thread"
         ).start()
 
+    def _attach_loop_start_id(self, output_state: State) -> None:
+        if "LoopStartId" in output_state:
+            return
+        output_state["LoopStartId"] = self.context.worker_id.split("-", 
1)[1].rsplit(
+            "-main-0", 1
+        )[0]
+        # The URI lives on the upstream operator's output port (which
+        # LoopStart's first materialization reader is reading from).
+        reader_runnables = (
+            self.context.input_manager.get_input_port_mat_reader_threads()
+        )
+        output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+            next(iter(reader_runnables.values()))[0].uri
+        )
+
+    def _jump_to_loop_start(
+        self, executor: LoopEndOperator, controller_interface
+    ) -> None:
+        state = executor.state
+        controller_interface.jump_to_operator_region(
+            JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+        )
+        uri = state["LoopStartStateURI"]
+        # Strip the per-iteration scratch (`table`, `output`) and the
+        # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
+        # user-visible loop state is written back to LoopStart's input.
+        for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+            state.pop(key, None)
+        writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
+        writer.put_one(State(state).to_tuple())
+        writer.close()

Review Comment:
   The hardcoded state-dict keys `"LoopStartId"`, `"LoopStartStateURI"`, 
`"loop_counter"`, `"table"`, and `"output"` live in user state alongside 
arbitrary user variables (the `initialization` block writes into `self.state` 
via `exec(..., self.state)`). A user whose loop body uses any of these names 
(especially `table` or `output`, which are the documented defaults in the 
operator UI) will collide silently: their values get stripped on writeback, and 
`condition` evaluating `i < len(table)` may suddenly see a pickled bytes object 
after the strip. Consider namespacing the loop machinery under a single 
reserved key (e.g., `"__loop__": {...}`) so user state is untouched.



##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) -> 
Iterator[Optional[TableLike]
             time, or None.
         """
         yield
+
+
+class LoopStartOperator(TableOperator):
+    @overrides.final
+    def process_state(self, state: State, port: int) -> Optional[State]:
+        if "LoopStartStateURI" in state:
+            state["loop_counter"] += 1
+            return state
+        self.state.update(state)
+        return None
+
+    @overrides.final
+    def produce_state_on_finish(self, port: int) -> State:
+        from pickle import dumps
+
+        self.state["table"] = 
dumps(Table(self._TableOperator__table_data[port]))
+        return dict(self.state)

Review Comment:
   Reaching into `self._TableOperator__table_data` from a subclass relies on 
Python's private name-mangling and tightly couples `LoopStartOperator` to the 
internal storage detail of `TableOperator`. Any rename of 
`TableOperator.__table_data` (a private attribute, so legitimately renameable 
without notice) silently breaks loops. Consider exposing a protected accessor 
on `TableOperator` (e.g., `_get_table_data(port)`) and using it here, or store 
the pickled table inside `process_table` instead.



##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -217,6 +225,19 @@ def save_state_to_storage_if_needed(self, state: State, 
port_id=None) -> None:
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:
+        port_id = self.get_port_ids()[0]
+        storage_uri_base = self._storage_uri_base
+        self.close_port_storage_writers()
+        DocumentFactory.create_document(
+            VFSURIFactory.result_uri(storage_uri_base),
+            self._ports[port_id].get_schema(),
+        )
+        DocumentFactory.create_document(
+            VFSURIFactory.state_uri(storage_uri_base), State.SCHEMA
+        )
+        self.set_up_port_storage_writer(port_id, storage_uri_base)

Review Comment:
   `reset_storage` blindly dereferences `self._storage_uri_base` and 
`self.get_port_ids()[0]`. If it is ever invoked before 
`set_up_port_storage_writer` has run (e.g., a LoopEnd that received state 
without yet having its output writer provisioned, or an operator with zero 
output ports), this will raise `TypeError`/`IndexError` with no context. Add a 
guard (or assertion with a descriptive error) and consider asserting that the 
executor really is a LoopEnd at the call site so this state-mutation is scoped 
to where it is meaningful.



##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -217,6 +225,19 @@ def save_state_to_storage_if_needed(self, state: State, 
port_id=None) -> None:
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_storage(self) -> None:

Review Comment:
   The method name `reset_storage` is too generic for a behavior that only 
makes sense for LoopEnd operators (it truncates this worker's result and state 
iceberg tables). The block comment at the constructor even calls it 
`reset_loopend_storage`, matching the PR description. Renaming the public 
method to `reset_loopend_storage` (or similar) would make call sites 
self-documenting and avoid suggesting general-purpose use.



##########
amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala:
##########
@@ -66,7 +67,12 @@ class WorkflowExecutionService(
 ) extends SubscriptionManager
     with LazyLogging {
 
-  workflowContext.workflowSettings = request.workflowSettings
+  workflowContext.workflowSettings =
+    if (request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc])) 
{
+      request.workflowSettings.copy(executionMode = ExecutionMode.MATERIALIZED)
+    } else {
+      request.workflowSettings
+    }

Review Comment:
   The coercion to `MATERIALIZED` happens only when a `LoopStartOpDesc` is 
present, but `LoopEndOpDesc` is what actually requires the iceberg output to 
persist across region invocations (see `RegionExecutionCoordinator`'s 
`isLoopEnd` branch). A workflow with a `LoopEndOpDesc` but no `LoopStartOpDesc` 
(e.g., malformed, partially-edited, or in a future use of LoopEnd as a 
standalone control op) would silently slip through pipelined mode. Also 
consider also checking for `LoopEndOpDesc` to keep both halves in sync.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to