Xiao-zhen-Liu commented on code in PR #4206:
URL: https://github.com/apache/texera/pull/4206#discussion_r3285892233
##########
amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala:
##########
@@ -66,7 +67,12 @@ class WorkflowExecutionService(
) extends SubscriptionManager
with LazyLogging {
- workflowContext.workflowSettings = request.workflowSettings
+ workflowContext.workflowSettings =
+ if (request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc]))
{
Review Comment:
Two issues:
- The execution service uses `isInstanceOf[LoopStartOpDesc]` to detect
loops, which makes a generic service depend on a specific operator class.
- This rule (force MATERIALIZED if a LoopStart is in the plan) has no tests.
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
Review Comment:
`worker_id.split("-", 1)[1].rsplit("-main-0", 1)[0]` recovers the operator
ID by chopping up the worker-name string. This depends on the exact
worker-naming format. If the format ever changes — an extra dash, a different
suffix, more than one worker per operator — the wrong ID is extracted silently,
and the back-jump goes to the wrong operator with no error.
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
+ # The URI lives on the upstream operator's output port (which
+ # LoopStart's first materialization reader is reading from).
+ reader_runnables = (
+ self.context.input_manager.get_input_port_mat_reader_threads()
+ )
+ output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ next(iter(reader_runnables.values()))[0].uri
+ )
+
+ def _jump_to_loop_start(
+ self, executor: LoopEndOperator, controller_interface
+ ) -> None:
+ state = executor.state
+ controller_interface.jump_to_operator_region(
+ JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+ )
+ uri = state["LoopStartStateURI"]
+ # Strip the per-iteration scratch (`table`, `output`) and the
+ # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
+ # user-visible loop state is written back to LoopStart's input.
+ for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+ state.pop(key, None)
Review Comment:
`_attach_loop_start_id` and `_jump_to_loop_start` are the most fragile new
methods — worker-name parsing, first-port assumption, state-key stripping,
direct iceberg write — and they have no tests.
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
+ # The URI lives on the upstream operator's output port (which
+ # LoopStart's first materialization reader is reading from).
+ reader_runnables = (
+ self.context.input_manager.get_input_port_mat_reader_threads()
+ )
+ output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ next(iter(reader_runnables.values()))[0].uri
Review Comment:
`next(iter(reader_runnables.values()))[0].uri` picks whichever input port
happens to be first, with no check that there is only one. If LoopStart ever
has more than one input, this silently picks one.
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
+ # The URI lives on the upstream operator's output port (which
+ # LoopStart's first materialization reader is reading from).
+ reader_runnables = (
+ self.context.input_manager.get_input_port_mat_reader_threads()
+ )
+ output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ next(iter(reader_runnables.values()))[0].uri
+ )
+
+ def _jump_to_loop_start(
+ self, executor: LoopEndOperator, controller_interface
+ ) -> None:
+ state = executor.state
+ controller_interface.jump_to_operator_region(
+ JumpToOperatorRegionRequest(OperatorIdentity(state["LoopStartId"]))
+ )
+ uri = state["LoopStartStateURI"]
+ # Strip the per-iteration scratch (`table`, `output`) and the
+ # loop metadata (`LoopStartId`, `LoopStartStateURI`) so only the
+ # user-visible loop state is written back to LoopStart's input.
+ for key in ("table", "output", "LoopStartId", "LoopStartStateURI"):
+ state.pop(key, None)
+ writer = DocumentFactory.create_document(uri, State.SCHEMA).writer("0")
+ writer.put_one(State(state).to_tuple())
+ writer.close()
+
def complete(self) -> None:
"""
Complete the DataProcessor, marking state to COMPLETED, and notify the
controller.
"""
# flush the buffered console prints
self._check_and_report_console_messages(force_flush=True)
- self.context.executor_manager.executor.close()
+ controller_interface = self._async_rpc_client.controller_stub()
+ executor = self.context.executor_manager.executor
+ if isinstance(executor, LoopEndOperator) and executor.condition():
+ self._jump_to_loop_start(executor, controller_interface)
Review Comment:
Two issues:
- `condition()` runs the user's Python expression with no error handling. If
it throws (typo, wrong variable, divide-by-zero), `executor.close()` is
skipped, the worker never finishes its state transition, and the workflow
hangs. Same applies to `update`, `initialization`, `output`.
- Nothing stops an infinite loop. If the user's `condition` never returns
False, the workflow runs forever with no iteration counter and no progress log.
##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort,
PhysicalOp}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+
+class LoopStartOpDesc extends LogicalOp {
+ @JsonProperty(required = true, defaultValue = "i = 0")
+ @JsonSchemaTitle("Initialization")
+ var initialization: String = _
+
+ @JsonProperty(required = true, defaultValue = "table.iloc[i]")
+ @JsonSchemaTitle("Output")
+ var output: String = _
Review Comment:
User expressions execute against `self.state`, which the runtime also uses
to store `loop_counter`, `table`, `output`, `LoopStartId`, `LoopStartStateURI`.
A user writing `loop_counter = 0` in their `initialization`, or `table = ...`,
silently overwrites loop machinery — with no warning. Same applies to `update`
/ `condition` in `LoopEndOpDesc`.
##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala:
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.core.executor.OpExecWithCode
+import org.apache.texera.amber.core.virtualidentity.{ExecutionIdentity,
WorkflowIdentity}
+import org.apache.texera.amber.core.workflow.{InputPort, OutputPort,
PhysicalOp}
+import org.apache.texera.amber.operator.LogicalOp
+import org.apache.texera.amber.operator.metadata.{OperatorGroupConstants,
OperatorInfo}
+
+class LoopStartOpDesc extends LogicalOp {
+ @JsonProperty(required = true, defaultValue = "i = 0")
+ @JsonSchemaTitle("Initialization")
+ var initialization: String = _
+
+ @JsonProperty(required = true, defaultValue = "table.iloc[i]")
+ @JsonSchemaTitle("Output")
+ var output: String = _
+
+ override def getPhysicalOp(
+ workflowId: WorkflowIdentity,
+ executionId: ExecutionIdentity
+ ): PhysicalOp =
+ PhysicalOp
+ .oneToOnePhysicalOp(
+ workflowId,
+ executionId,
+ operatorIdentifier,
+ OpExecWithCode(generatePythonCode(), "python")
+ )
+ .withInputPorts(operatorInfo.inputPorts)
+ .withOutputPorts(operatorInfo.outputPorts)
+ .withSuggestedWorkerNum(1)
+ .withParallelizable(false)
+
+ override def operatorInfo: OperatorInfo =
+ OperatorInfo(
+ "Loop Start",
+ "Begin a loop that iterates over rows of the input table; pairs with
Loop End.",
+ OperatorGroupConstants.CONTROL_GROUP,
+ inputPorts = List(InputPort()),
+ outputPorts = List(OutputPort())
+ )
+
+ def generatePythonCode(): String = {
+ s"""
+ |from pytexera import *
+ |class ProcessLoopStartOperator(LoopStartOperator):
+ | @overrides
+ | def open(self):
+ | self.state = {"loop_counter": 0}
+ | exec("$initialization", {}, self.state)
+ |
+ | @overrides
Review Comment:
The user's expression is dropped into a string literal:
`exec("$initialization", {}, self.state)`. With `initialization = 'name =
"foo"'`, the generator produces `exec("name = "foo"", ...)` — invalid Python.
The error doesn't surface until the operator's class loads at run time, far
from where the user typed the expression. Multi-line input has the same
problem. The same pattern appears in `LoopEndOpDesc.scala:78-83`.
##########
amber/src/main/python/core/runnables/main_loop.py:
##########
@@ -87,19 +92,54 @@ def __init__(
target=self.data_processor.run, daemon=True,
name="data_processor_thread"
).start()
+ def _attach_loop_start_id(self, output_state: State) -> None:
+ if "LoopStartId" in output_state:
+ return
+ output_state["LoopStartId"] = self.context.worker_id.split("-",
1)[1].rsplit(
+ "-main-0", 1
+ )[0]
+ # The URI lives on the upstream operator's output port (which
+ # LoopStart's first materialization reader is reading from).
+ reader_runnables = (
+ self.context.input_manager.get_input_port_mat_reader_threads()
+ )
+ output_state["LoopStartStateURI"] = VFSURIFactory.state_uri(
+ next(iter(reader_runnables.values()))[0].uri
+ )
Review Comment:
This writes `LoopStartStateURI` — a storage path — into the state object
that flows through every operator in the loop body, including user UDFs. The
URI is internal runtime data; user code shouldn't see it or be able to write to
it. LoopEnd already has the operator ID, so the URI doesn't need to travel
through state.
##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -217,6 +225,19 @@ def save_state_to_storage_if_needed(self, state: State,
port_id=None) -> None:
elif port_id in self._port_state_writers:
self._port_state_writers[port_id][0].put(element)
+ def reset_storage(self) -> None:
Review Comment:
`reset_storage` has no description. The name doesn't say what the method
actually does, which is to delete and recreate two iceberg tables. It lives on
`OutputManager`, a general class, but the only caller is a single branch in
`main_loop.py:237` — neither the method's name nor its location hint at that.
It assumes `set_up_port_storage_writer` was called first and that the operator
has exactly one output port; neither is checked. It has no tests.
The PR description ("Truncate LoopEnd's iceberg tables at each iteration
boundary") makes it sound like a class-wide property of LoopEnd. The method is
actually called only from one runtime path; in a nested loop with multiple
LoopEnds, not every LoopEnd resets on every invocation.
The reason this is correct — downstream readers are paused because the
output mode is MATERIALIZED, so they only read after the loop finishes — lives
only in the PR description.
##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]
time, or None.
"""
yield
+
+
+class LoopStartOperator(TableOperator):
+ @overrides.final
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ if "LoopStartStateURI" in state:
+ state["loop_counter"] += 1
Review Comment:
On the nested-loop branch, this mutates the dict it was passed
(`state["loop_counter"] += 1`) and returns it. The runtime, not this operator,
owns that dict.
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala:
##########
@@ -198,6 +198,7 @@ case class PhysicalOp(
// schema propagation function
propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas =>
schemas),
isOneToManyOp: Boolean = false,
+ isLoopEnd: Boolean = false,
Review Comment:
`isLoopEnd` names the flag after one specific operator type. What the
scheduler actually checks is more general — "this operator's output must
survive across re-runs of its region." If anything else ever needs the same
behavior, we'll either reuse a misnamed flag or add a near-duplicate.
##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]
time, or None.
"""
yield
+
+
+class LoopStartOperator(TableOperator):
+ @overrides.final
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ if "LoopStartStateURI" in state:
+ state["loop_counter"] += 1
+ return state
+ self.state.update(state)
+ return None
+
+ @overrides.final
+ def produce_state_on_finish(self, port: int) -> State:
+ from pickle import dumps
+
+ self.state["table"] =
dumps(Table(self._TableOperator__table_data[port]))
Review Comment:
Two issues on this line:
- `self._TableOperator__table_data[port]` reads a parent class's private
field by writing out its name-mangled form. This depends on the parent being
named exactly `TableOperator`; renaming the parent silently breaks this.
- The table is pickled to bytes, then stored inside a state dict that is
serialized as a JSON string (the `State` schema is `{CONTENT: STRING}`). The
table makes a `pickle → bytes → JSON-string → iceberg` trip every iteration.
`pickle.loads` of data anyone can write to is a remote-code-execution surface.
##########
amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala:
##########
@@ -66,7 +67,12 @@ class WorkflowExecutionService(
) extends SubscriptionManager
with LazyLogging {
- workflowContext.workflowSettings = request.workflowSettings
+ workflowContext.workflowSettings =
+ if (request.logicalPlan.operators.exists(_.isInstanceOf[LoopStartOpDesc]))
{
+ request.workflowSettings.copy(executionMode = ExecutionMode.MATERIALIZED)
+ } else {
+ request.workflowSettings
Review Comment:
When the workflow contains a Loop Start, this block forces the execution
mode to MATERIALIZED. The frontend setting still displays whatever the user
picked. The UI and the running system disagree, and the user has no way to know.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -576,8 +576,17 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
- DocumentFactory.createDocument(resultURI, schema)
- DocumentFactory.createDocument(stateURI, State.schema)
+ // LoopEnd operators may re-execute the region multiple times; on
+ // subsequent iterations the result/state documents already exist,
+ // and `createDocument` (overrideIfExists=true) would clobber them.
+ // Skip the create call when the document is already there.
+ val isLoopEndRegion = region.getOperators.exists(_.isLoopEnd)
+ if (!isLoopEndRegion || !DocumentFactory.documentExists(resultURI)) {
+ DocumentFactory.createDocument(resultURI, schema)
+ }
+ if (!isLoopEndRegion || !DocumentFactory.documentExists(stateURI)) {
+ DocumentFactory.createDocument(stateURI, State.schema)
+ }
Review Comment:
The skip-create branch has no tests. Without this change, loops would wipe
their own output every iteration — so this is a load-bearing invariant with no
coverage.
##########
amber/src/main/python/core/models/operator.py:
##########
@@ -291,3 +291,30 @@ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]
time, or None.
"""
yield
+
+
+class LoopStartOperator(TableOperator):
+ @overrides.final
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ if "LoopStartStateURI" in state:
+ state["loop_counter"] += 1
+ return state
+ self.state.update(state)
+ return None
+
+ @overrides.final
+ def produce_state_on_finish(self, port: int) -> State:
+ from pickle import dumps
+
+ self.state["table"] =
dumps(Table(self._TableOperator__table_data[port]))
+ return dict(self.state)
+
+
+class LoopEndOperator(TableOperator):
+ @overrides.final
+ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
+ yield table
+
+ @abstractmethod
+ def condition(self) -> bool:
Review Comment:
`LoopEndOperator` doesn't itself declare `process_state`, but the generator
overrides `process_state` anyway. To understand what either loop operator does,
you have to read three files together: the Scala generator template, this base
class, and its parents. The rules that tie them together — which method runs
when, which keys are reserved (`loop_counter`, `table`, `output`,
`LoopStartId`, `LoopStartStateURI`), what `self.state` must contain by the time
`open()` returns — are not encoded anywhere as code; they're string conventions
shared across files.
##########
amber/src/test/python/core/models/test_loop_operators.py:
##########
@@ -0,0 +1,424 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Unit tests for the loop runtime: LoopStartOperator and LoopEndOperator.
+
+These exercise the abstract base classes in operator.py that the
+generated `ProcessLoopStartOperator` / `ProcessLoopEndOperator` classes
+extend. The tests use minimal stub subclasses that mirror what
+`LoopStartOpDesc.generatePythonCode` / `LoopEndOpDesc.generatePythonCode`
+emit so the behavior covered here is the same shape that ships at
+runtime.
+
+Single-loop coverage:
+ - LoopStart's first-time state observation (merge into self.state).
+ - LoopEnd's process_table is the identity yield.
+ - End-to-end one-iteration loop driven through the matching-loop branch.
+
+Nested-loop coverage:
+ - LoopStart.process_state with `LoopStartStateURI` already present
+ must increment `loop_counter` and pass the state through downstream
+ (this is what makes inner LoopStart not consume outer-loop state).
+ - LoopEnd's generated process_state, when `loop_counter > 0`, must
+ decrement and return the state unchanged so the outer LoopEnd is
+ the one that runs the user's update / condition.
+ - Round-trip outer × inner loop preserves the nesting invariant
+ (loop_counter is symmetric across LoopStart/LoopEnd traversals).
+"""
+
+from pickle import loads
+from typing import Iterator, Optional
+
+import pytest
+
+from core.models import State, Table, TableLike, Tuple
+from core.models.operator import LoopEndOperator, LoopStartOperator
+
+
+# ---------------------------------------------------------------------------
+# Stub subclasses that mirror the generated Python in
+# LoopStart/LoopEnd OpDesc. Keeping them here (rather than reusing the
+# real generator) lets the test pin behavior without spinning up a Scala
+# runtime to produce code.
+# ---------------------------------------------------------------------------
+
+
+class _StubLoopStart(LoopStartOperator):
+ """Mirrors `ProcessLoopStartOperator` from LoopStartOpDesc codegen.
+
+ open() seeds `loop_counter` to 0 and runs the user's `initialization`.
+ process_table runs the user's `output` expression and yields the
+ result for downstream.
+ """
+
+ def __init__(self, initialization="i = 0", output_expr="table.iloc[i]"):
+ super().__init__()
+ self._initialization = initialization
+ self._output_expr = output_expr
+
+ def open(self) -> None:
+ self.state = {"loop_counter": 0}
+ exec(self._initialization, {}, self.state)
+
+ def process_table(self, table: Table, port: int) ->
Iterator[Optional[TableLike]]:
+ self.state["table"] = table
+ exec(f"output = {self._output_expr}", {}, self.state)
+ yield self.state["output"]
+
+
+class _StubLoopEnd(LoopEndOperator):
+ """Mirrors `ProcessLoopEndOperator` from LoopEndOpDesc codegen.
+
+ process_state recognises the nested-loop pass-through path
+ (`loop_counter > 0`) and decrements; on the matching-loop branch
+ it stashes the state, deserializes the pickled table, and runs the
+ user's `update`. condition() returns the boolean result of the
+ user's `condition` expression evaluated in self.state.
+ """
+
+ def __init__(self, update="i += 1", condition_expr="i < 3"):
+ super().__init__()
+ self._update = update
+ self._condition_expr = condition_expr
+ self.state = {}
+
+ def process_state(self, state: State, port: int) -> Optional[State]:
+ loop_counter = int(state.get("loop_counter", 0))
+ if loop_counter > 0:
+ state["loop_counter"] = loop_counter - 1
+ return state
+ self.state = dict(state)
+ self.state["table"] = loads(self.state["table"])
+ exec(self._update, {}, self.state)
+ return None
+
+ def condition(self) -> bool:
+ exec(f"output = {self._condition_expr}", {}, self.state)
+ return self.state["output"]
+
+
+# ---------------------------------------------------------------------------
+# LoopStartOperator — process_state
+# ---------------------------------------------------------------------------
+
+
+class TestLoopStartProcessState:
+ def
test_first_time_state_is_merged_into_self_state_and_none_is_returned(self):
+ # First entry: state from upstream (no LoopStartStateURI). The
+ # base class must merge it into self.state and return None so
+ # nothing flows downstream of LoopStart until the table is in.
+ op = _StubLoopStart()
+ op.open()
+ op.state["i"] = 0 # simulate the user's initialization
+
+ result = op.process_state(State({"upstream_key": "v"}), port=0)
+
+ assert result is None, "first-time state must not be forwarded"
+ assert op.state["upstream_key"] == "v", "state was not merged into
self.state"
+ # loop_counter is left at its open() value (0) on first entry.
+ assert op.state["loop_counter"] == 0
+
+ def test_reentry_state_with_loop_start_uri_increments_loop_counter(self):
+ # Re-entry from this LoopStart's own LoopEnd: the state carries
+ # LoopStartStateURI, so the base class must INCREMENT
+ # loop_counter and PASS THROUGH the state downstream. This is
+ # what main_loop's _attach_loop_start_id relies on.
+ op = _StubLoopStart()
+ op.open()
+ incoming = State({"LoopStartStateURI": "vfs:///x", "loop_counter": 0,
"i": 2})
+
+ result = op.process_state(incoming, port=0)
+
+ assert result is not None, "re-entry state must be returned for
downstream"
+ assert result["loop_counter"] == 1
+ # The user variable rides along.
+ assert result["i"] == 2
+
+ def test_reentry_at_nested_loop_counter_bumps_one(self):
+ # Nested loop: an outer loop's re-entry state passes through this
+ # inner LoopStart with a loop_counter already > 0 (because the
+ # outer LoopStart bumped it on its own re-entry first). The
+ # invariant is that we only ever +1, never reset.
+ op = _StubLoopStart()
+ op.open()
+ incoming = State({"LoopStartStateURI": "vfs:///outer", "loop_counter":
5})
+
+ result = op.process_state(incoming, port=0)
+
+ assert result["loop_counter"] == 6
+
+
+# ---------------------------------------------------------------------------
+# LoopStartOperator — produce_state_on_finish
+# ---------------------------------------------------------------------------
+
+
+class TestLoopStartProduceStateOnFinish:
+ def test_pickles_buffered_table_into_state_table_field(self):
+ # produce_state_on_finish must serialize the buffered table via
+ # pickle (so the cross-region state stream can carry a heavy
+ # pandas DataFrame as bytes). The receiving LoopEnd unpickles
+ # it on the matching-loop branch.
+ op = _StubLoopStart()
+ op.open()
+ # Drive a couple of tuples through to populate the per-port buffer.
+ list(op.process_tuple(Tuple({"v": 1}), port=0))
+ list(op.process_tuple(Tuple({"v": 2}), port=0))
+
+ produced = op.produce_state_on_finish(port=0)
+
+ assert isinstance(produced, dict)
+ assert "table" in produced
+ assert isinstance(produced["table"], bytes), "table must be pickled
bytes"
+ # Round-trip through pickle.loads must give back our two tuples.
+ unpickled = loads(produced["table"])
+ assert isinstance(unpickled, Table)
+ rows = list(unpickled.as_tuples())
+ assert rows == [Tuple({"v": 1}), Tuple({"v": 2})]
+
+ def test_user_state_fields_survive_into_produced_state(self):
+ # Any vars the user set in open() (e.g. i, accumulators) must
+ # ride along in the produced state so LoopEnd can run the user's
+ # `update` expression against them.
+ op = _StubLoopStart(initialization="i = 0; acc = []")
+ op.open()
+ list(op.process_tuple(Tuple({"v": 1}), port=0))
+
+ produced = op.produce_state_on_finish(port=0)
+
+ assert produced["i"] == 0
+ assert produced["acc"] == []
+ assert produced["loop_counter"] == 0
+
+
+# ---------------------------------------------------------------------------
+# LoopEndOperator — base class behaviour
+# ---------------------------------------------------------------------------
+
+
+class TestLoopEndBase:
+ def test_process_table_yields_input_table_unchanged(self):
+ # The base class finalizes process_table to a single identity
+ # yield. The user only ever overrides condition() and (via
+ # codegen) process_state.
+ op = _StubLoopEnd()
+ in_table = Table([Tuple({"x": 1}), Tuple({"x": 2})])
+ out = list(op.process_table(in_table, port=0))
+ assert out == [in_table]
+
+ def test_condition_is_abstract_on_base_class(self):
Review Comment:
`pytest.raises(TypeError, match="condition")` matches on Python's "missing
abstract method" error text. That wording has changed between CPython versions
before. Match on `"abstract"` instead, or drop `match`.
##########
amber/src/test/python/core/models/test_loop_operators.py:
##########
@@ -0,0 +1,424 @@
+# Licensed to the Apache Software Foundation (ASF) under one
Review Comment:
Verify CI picks up this path. Other Python sources are under
`amber/src/main/python/...`; if `amber/src/test/python/...` isn't included in
the runner config, these tests silently don't run.
##########
common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala:
##########
@@ -0,0 +1,152 @@
+/*
Review Comment:
Two test-suite issues:
- Nothing in the tests actually compiles the generator's output. The Scala
specs check that the generated string contains expected substrings; the Python
tests use hand-written stub classes that mimic what the generator should
produce. So a quote or newline in user input that breaks the codegen
(`LoopStartOpDesc.scala:70-73` / `LoopEndOpDesc.scala:78-83`) passes both test
layers and only fails at run time.
- `LoopStartOpDescSpec` and `LoopEndOpDescSpec` duplicate about 80% of their
scaffolding.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]