aglinxinyuan commented on PR #5700:
URL: https://github.com/apache/texera/pull/5700#issuecomment-4700160611

   ## Review history (migrated from #4206)
   
   This PR was re-opened from a fork (see the description). The code-review 
discussion happened on the original PR #4206 — reproduced below with the 
**original author and date** so it isn't lost. The canonical threads (with full 
inline diff context) remain on 
[#4206](https://github.com/apache/texera/pull/4206). _Auto-generated bot 
comments (Codecov coverage, CI summaries) are omitted — they regenerate on this 
PR._
   
   <details>
   <summary><b>Conversation comments (8)</b></summary>
   
   **@Xiao-zhen-Liu** — 2026-05-19:
   > @aglinxinyuan Can you upload the workflows used for testing in the PR 
description?
   
   **@aglinxinyuan** — 2026-05-19:
   > > @aglinxinyuan Can you upload the workflows used for testing in the PR 
description?
   >
   > Updated.
   
   **@aglinxinyuan** — 2026-05-19:
   > I plan to add test cases on a separate PR. What do you think?
   
   **@Xiao-zhen-Liu** — 2026-05-20:
   > > I plan to add test cases on a separate PR. What do you think?
   >
   > I think it makes more sense to include test cases in this PR. Usually test 
cases are not good candidates for splitting into a future PR.
   
   **@chenlica** — 2026-05-20:
   > > > I plan to add test cases on a separate PR. What do you think?
   > > 
   > > I think it makes more sense to include test cases in this PR. Usually 
test cases are not good candidates for splitting into a future PR.
   >
   > Agreed.
   
   **@aglinxinyuan** — 2026-05-20:
   > Sure, I’ll add test cases in this PR.
   >
   > For reference, before adding the test cases, this PR already contains 302 
lines of changes across 12 files.
   
   **@aglinxinyuan** — 2026-06-09:
   > @Xiao-zhen-Liu, please review the PR again.
   
   **@aglinxinyuan** — 2026-06-13:
   > Superseded by #5700, which re-opens this from my fork 
(aglinxinyuan/texera) to satisfy the requirement that contributions come from a 
fork rather than a branch on the main repo. The code is identical and the 
labels/CI carry over. Continuing on #5700 — the full review discussion here 
remains for reference. Thanks!
   
   </details>
   
   <details>
   <summary><b>Inline review threads — 38 threads, 67 comments</b></summary>
   
   ---
   **`amber/src/main/python/core/architecture/packaging/output_manager.py` 
L228** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435610)
   
   **@Copilot** — 2026-05-18:
   > The method name `reset_storage` is too generic for a behavior that only 
makes sense for LoopEnd operators (it truncates this worker's result and state 
iceberg tables). The block comment at the constructor even calls it 
`reset_loopend_storage`, matching the PR description. Renaming the public 
method to `reset_loopend_storage` (or similar) would make call sites 
self-documenting and avoid suggesting general-purpose use.
   
   ---
   **`amber/src/main/python/core/architecture/packaging/output_manager.py` 
L228** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892254)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > `reset_storage` has no description. The name doesn't say what the method 
actually does, which is to delete and recreate two iceberg tables. It lives on 
`OutputManager`, a general class, but the only caller is a single branch in 
`main_loop.py:237` — neither the method's name nor its location hint at that. 
It assumes `set_up_port_storage_writer` was called first and that the operator 
has exactly one output port; neither is checked. It has no tests.
   >
   > The PR description ("Truncate LoopEnd's iceberg tables at each iteration 
boundary") makes it sound like a class-wide property of LoopEnd. The method is 
actually called only from one runtime path; in a nested loop with multiple 
LoopEnds, not every LoopEnd resets on every invocation.
   >
   > The reason this is correct — downstream readers are paused because the 
output mode is MATERIALIZED, so they only read after the loop finishes — lives 
only in the PR description.
   
   **@aglinxinyuan** — 2026-05-27:
   > Addressed in e6bea518f2. (The method is now `reset_output_storage` after 
an earlier rename, and on the current branch it recreates just the one output 
result table — the state table is handled separately in 
`save_state_to_storage_if_needed`.)
   >
   > * **Docstring**: it now says what the method does (drop + recreate the 
single output table, bracketed by closing the old writer and opening a fresh 
one), that it is called only by a Loop End worker once per iteration, and — the 
reasoning that previously lived only in the PR description — *why* truncating 
live storage is safe: a loop runs in MATERIALIZED mode, so downstream operators 
don't read the table until the loop has finished, so no reader observes the 
intermediate truncation.
   > * **Preconditions checked**: the two previously-implicit assumptions now 
raise a clear `RuntimeError` instead of silently resetting the wrong port / 
raising a bare `KeyError` — (1) exactly one output port, (2) 
`set_up_port_storage_writer` already ran for it.
   > * **Tests**: new `test_output_manager.py` covers the happy path (recreate 
bracketed by close→reopen) and both guard failures, with the iceberg/thread 
collaborators mocked so it stays hermetic.
   >
   > On the location/naming: it stays on `OutputManager` because it operates 
entirely on that class's private writer/URI state; the docstring now makes the 
single-caller, Loop-End-only usage explicit so the general location doesn't 
mislead.
   
   ---
   **`amber/src/main/python/core/architecture/packaging/output_manager.py` 
L237** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851469)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > **Resolved thread 17, but the fix isn't in the branch.** The reply 
described a rename to `reset_output_storage`, a docstring, and guards (commit 
`e6bea518f2`), but the method here is still `reset_storage`, undocumented, with 
no checks. The reason truncation is safe (downstream is paused while in 
MATERIALIZED mode) lives only in the PR description. Please re-open, or point 
me at the commit if it was lost.
   
   **@aglinxinyuan** — 2026-06-12:
   > You're right — that earlier work was lost when the branch was 
rebased/squashed. Re-applied on the current branch in 2095b592ee.
   >
   > * **Renamed** `reset_storage` → `reset_output_storage` (and updated the 
caller in `main_loop.py` plus the two `__init__` / `set_up_port_storage_writer` 
comments that already referenced the intended name).
   > * **Docstring** now states what it does (drop + recreate the result AND 
state tables, then reopen the writers), that it's called only by a Loop End 
worker once per iteration, and — the part that previously lived only in the PR 
description — **why truncating live storage is safe**: a loop forces 
MATERIALIZED execution mode, so downstream operators don't begin reading this 
output until the loop region has fully completed; no reader can observe an 
intermediate truncation.
   > * **Guards**: the two previously-implicit preconditions now raise a clear 
`RuntimeError` instead of silently resetting the wrong port or dereferencing 
`None` — (1) exactly one output port, (2) `set_up_port_storage_writer` already 
ran (`_storage_uri_base` populated).
   > * **Tests**: new `TestResetOutputStorage` in `test_output_manager.py` 
covers the happy path (close → recreate result+state docs → reopen writer) and 
both guard failures, with the iceberg/thread collaborators mocked.
   >
   > 9/9 `test_output_manager.py` and 24/24 `test_main_loop.py` tests green.
   
   ---
   **`amber/src/main/python/core/architecture/packaging/output_manager.py` 
L239** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435585)
   
   **@Copilot** — 2026-05-18:
   > `reset_storage` blindly dereferences `self._storage_uri_base` and 
`self.get_port_ids()[0]`. If it is ever invoked before 
`set_up_port_storage_writer` has run (e.g., a LoopEnd that received state 
without yet having its output writer provisioned, or an operator with zero 
output ports), this will raise `TypeError`/`IndexError` with no context. Add a 
guard (or assertion with a descriptive error) and consider asserting that the 
executor really is a LoopEnd at the call site so this state-mutation is scoped 
to where it is meaningful.
   
   ---
   **`amber/src/main/python/core/architecture/packaging/output_manager.py` 
L260** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851478)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > **Possible data loss.** This recreates both the result and state tables 
every iteration (`override_if_exists=True`), but 
`RegionExecutionCoordinator.scala:579-589` deliberately does *not* recreate 
LoopEnd's documents on a re-run — its comment says recreating them "would erase 
what we just wrote." The two paths look contradictory; how is the accumulated 
output not erased? (Also: the description says this only runs for the inner 
LoopEnd of a nested loop, but `main_loop.py:267` calls it for any LoopEnd with 
output state.)
   
   **@aglinxinyuan** — 2026-06-13:
   > Good catch — this is a real latent bug, deeper than the apparent 
contradiction. Fixed in e61681d695.
   >
   > The two paths don't actually conflict at runtime, because **the Python 
reset never fires**: a Loop End's generated `process_state` returns `None` (and 
`produce_state_on_finish` isn't overridden, so it's `None` too), so 
`output_state` is always `None` for a Loop End — and `reset_output_storage()` 
sat under `if output_state is not None:`. On top of that it was hooked in 
`process_input_state` (the consume path, `loop_counter == 0`), not the outer 
pass-through (`loop_counter > 0`) where it belongs. So your parenthetical was 
sharper than it looked: not only is the description's "inner Loop End only" 
claim not matched by the code — the call was effectively dead for *every* Loop 
End.
   >
   > So today every Loop End just **accumulates**, which is actually correct 
for a single / outermost loop (the scheduler's 
`reusesOutputStorageOnReExecution` keeps the doc across re-runs and the writer 
appends — that's the `RegionExecutionCoordinator` "would erase what we just 
wrote" path, and it's right). The gap is the **nested** case: an inner Loop End 
should accumulate only within the current outer iteration and reset when the 
outer loop advances. With the reset dead, the inner Loop End accumulated across 
all outer iterations (9 rows in the 3×3 case instead of 3).
   >
   > Also — my earlier description edit (the one that prompted this) was wrong: 
I'd "corrected" the description toward the buggy code. The *original* wording 
(inner Loop End of a nested loop, `loop_counter > 0` pass-through) described 
the intent correctly, and I've restored it.
   >
   > **Fix:**
   > - Move `reset_output_storage()` to the inner-Loop-End pass-through branch 
in `_process_state_frame` (`loop_counter > 0`). The input reader replays all 
states before any data each region execution, so the tables still hold the 
*previous* outer iteration's rows when the outer boundary state passes through 
— clearing there makes each outer iteration accumulate from empty.
   > - It fires exactly **once per outer iteration**: each loop operator is its 
own region, so the inner Loop Start's region doesn't carry 
`reusesOutputStorageOnReExecution` and its output is recreated on every inner 
back-edge — the outer pass-through therefore only reaches the inner Loop End on 
the first inner iteration of each outer iteration. A single / outermost Loop 
End never sees `loop_counter > 0`, so it never resets.
   > - Removed the dead consume-path call; corrected the `reset_output_storage` 
docstring, the call-site comment, and the `RegionExecutionCoordinator` comment. 
Scala side unchanged — it provides the base per-loop accumulation this reset 
carves the nested exception out of.
   >
   > **Tests:**
   > - Unit (`test_main_loop`): the inner pass-through triggers 
`reset_output_storage` once and doesn't invoke the operator; the consume path 
(single loop) and a Loop Start pass-through never reset.
   > - Integration (`LoopIntegrationSpec`): assert the **materialized** result 
row counts — single loop = 3 (accumulate), nested inner Loop End = 3 (not 9), 
nested outer Loop End = 9. The pre-existing cumulative output-tuple counts 
can't distinguish accumulate from reset, which is why this slipped through. 
Verified locally via the unit tests + ruff; the materialized integration 
assertions run in the `amber-integration` CI job.
   
   ---
   **`amber/src/main/python/core/models/operator.py` L300** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435497)
   
   **@Copilot** — 2026-05-18:
   > `LoopStartOperator.process_state` does `state["loop_counter"] += 1` when 
the incoming state is from an outer scope. This silently assumes the incoming 
state already contains a `loop_counter` key, which is only true if the outer 
scope was also produced by a `LoopStart` whose `open()` initialized it. Any 
non-loop upstream that supplies a state carrying `LoopStartStateURI` (or an 
outer LoopStart whose user `initialization` happens not to define 
`loop_counter`) will raise `KeyError` at runtime. Either default-initialize via 
`state.get("loop_counter", 0) + 1` or assert the precondition with a clear 
error.
   
   ---
   **`amber/src/main/python/core/models/operator.py` L300** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892258)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > On the nested-loop branch, this mutates the dict it was passed 
(`state["loop_counter"] += 1`) and returns it. The runtime, not this operator, 
owns that dict.
   
   **@aglinxinyuan** — 2026-06-01:
   > Addressed by moving `loop_counter` out of the `State` content dict 
entirely (latest: 63d243353). The loop operators never read or mutate it. It 
rides on the `StateFrame` transport envelope and the worker runtime owns it: 
`main_loop._process_state_frame` applies the `+1`/`-1` and handles the 
LoopStart/LoopEnd nested pass-through before the operator runs (so the 
generated LoopEnd is now consume-only). It is materialized/serialized as its 
own `loop_counter` column parallel to `content`: `State.SCHEMA` is the 
two-column schema and `State.to_tuple(loop_counter)` writes both columns, while 
`from_tuple` returns the bare State (the readers that need the counter read the 
column directly). The user state JSON stays clean. Operator-level counter 
coverage was relocated to `main_loop` runtime tests.
   
   ---
   **`amber/src/main/python/core/models/operator.py` L309** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892259)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > Two issues on this line:
   > - `self._TableOperator__table_data[port]` reads a parent class's private 
field by writing out its name-mangled form. This depends on the parent being 
named exactly `TableOperator`; renaming the parent silently breaks this.
   > - The table is pickled to bytes, then stored inside a state dict that is 
serialized as a JSON string (the `State` schema is `{CONTENT: STRING}`). The 
table makes a `pickle → bytes → JSON-string → iceberg` trip every iteration. 
`pickle.loads` of data anyone can write to is a remote-code-execution surface.
   
   **@aglinxinyuan** — 2026-06-04:
   > Both issues fixed in e281c61b4c.
   >
   > **1. Name-mangled access.** Added a protected 
`TableOperator._buffered_table(port)` accessor; inside the class 
`self.__table_data` resolves normally so a rename of `TableOperator` stays 
transparent. `LoopStartOperator.produce_state_on_finish` now goes through it 
instead of `self._TableOperator__table_data[port]`.
   >
   > **2. Pickle as RCE surface.** Swapped the bytes format from pickle to 
Apache Arrow IPC — structured + typed, no callable payload, parse errors raise 
at read time. Two new helpers in `core/models/table.py`:
   >
   > * `table_to_ipc_bytes(table) -> bytes` (sender side, used by Loop Start)
   > * `table_from_ipc_bytes(buf) -> Table` (receiver side, emitted by the 
codegen for Loop End)
   >
   > The codegen in `LoopEndOpDesc.scala` now emits `from core.models.table 
import table_from_ipc_bytes; self.state["table"] = 
table_from_ipc_bytes(self.state["table"])` in place of the prior `from pickle 
import loads` lines. The wire shape (bytes-in-`state["table"]`) is unchanged; 
only the format swaps.
   >
   > **Tests:**
   > * New `core/models/test_loop_operators.py` (8 tests, all green): pins the 
accessor, the Arrow IPC round-trip across mixed/single-row/empty tables, that 
the serialized bytes parse as an Arrow IPC stream (stronger than a 
pickle-prefix check), that malformed input raises at parse time, and the 
end-to-end Loop Start sender path.
   > * Extended `LoopOpDescsSpec`: asserts the generated Loop End source 
imports `table_from_ipc_bytes` and contains no `pickle` reference at all.
   >
   > Diff scoped to 5 files (operator.py, table.py, test_loop_operators.py, 
LoopEndOpDesc.scala, LoopOpDescsSpec.scala) — no unrelated churn. The 
URI-in-state alternative was considered but rejected as ~4× the diff with new 
cleanup plumbing; the surgical pickle→Arrow swap fully resolves both concerns.
   
   ---
   **`amber/src/main/python/core/models/operator.py` L310** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435541)
   
   **@Copilot** — 2026-05-18:
   > Reaching into `self._TableOperator__table_data` from a subclass relies on 
Python's private name-mangling and tightly couples `LoopStartOperator` to the 
internal storage detail of `TableOperator`. Any rename of 
`TableOperator.__table_data` (a private attribute, so legitimately renameable 
without notice) silently breaks loops. Consider exposing a protected accessor 
on `TableOperator` (e.g., `_get_table_data(port)`) and using it here, or store 
the pickled table inside `process_table` instead.
   
   ---
   **`amber/src/main/python/core/models/operator.py` L386** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851484)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > **Resolved thread 19, but the fix isn't in the branch.** The reply 
described a `TableOperator._buffered_table(port)` accessor and an Arrow 
replacement for pickle (commit `e281c61b4c`); neither is here. This still reads 
the parent's private field through the mangled name 
`self._TableOperator__table_data[port]` (which breaks silently if 
`TableOperator` is renamed), and the table is still moved with `pickle` 
(`loads` at line 444) — the remote-code-execution risk from the first review. 
Please re-open both.
   
   **@aglinxinyuan** — 2026-06-13:
   > Re-applied in 620edeb4fc (lost in a force-rebase). Both halves:
   >
   > **1. Name-mangled access → accessor.** Added 
`TableOperator._buffered_table(port)`; inside the class `self.__table_data` 
resolves via normal name mangling, so a rename of `TableOperator` stays 
transparent. `LoopStartOperator.produce_state_on_finish` now goes through it 
instead of `self._TableOperator__table_data[port]`.
   >
   > **2. Pickle → Apache Arrow IPC.** Both pickle uses are in `operator.py` 
base helpers on this tip (`dumps` in `produce_state_on_finish`, `loads` in 
`run_update`), so it's a pure Python-side fix — no codegen change. New 
`table_to_ipc_bytes` / `table_from_ipc_bytes` in `table.py` (built on 
`pyarrow.ipc`); the producer encodes, the consumer decodes. Arrow IPC is 
length-prefixed and schema-typed with no callable payload, so the 
`pickle.loads`-on-iceberg-data RCE surface is gone. The wire shape (bytes in 
`state["table"]`) is unchanged — only the format.
   >
   > **Tests:** `TestBufferedTableAccessor` pins the accessor (incl. per-port 
keying); the produce-state test asserts the bytes parse as an Arrow IPC stream 
(stronger than a no-pickle-prefix check) and round-trip back to the same 
tuples; the matching-branch tests feed Arrow bytes. 14 loop-operator tests 
pass; ruff clean.
   >
   > (The unrelated `test_tuple.py::test_hash` OSError on my machine is a 
pre-existing Windows issue in `tuple.py` hashing, not from this change.)
   
   ---
   **`amber/src/main/python/core/models/operator.py` L442** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851487)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > `eval_condition` reads `self._loop_table` and `self.state`, which are only 
created in `run_update`. `complete()` calls `condition()` for any LoopEnd. If a 
LoopEnd finishes without having consumed state (empty input, or an inner 
LoopEnd that only passed outer-loop state through), this raises 
`AttributeError`. Please confirm that can't happen, or initialize both in 
`__init__`.
   
   **@aglinxinyuan** — 2026-06-13:
   > Confirmed reachable, and fixed in 0b01d01261 (initialized in `__init__`, 
per your suggestion, plus a guard).
   >
   > `MainLoop.complete()` calls `condition()` on every LoopEnd, and 
`eval_condition` reads `self.state` / `self._loop_table`, which only 
`run_update` assigns. So a LoopEnd that finishes without consuming a matching 
state — an inner LoopEnd that only forwarded outer-loop pass-through state 
(`loop_counter > 0` is handled and returned in `_process_state_frame` before 
the operator runs), or a loop with no matching-branch consume — would raise 
`AttributeError`. The `_StubLoopEnd` test stub was hiding this by pre-seeding 
`self.state = {}` in its own `__init__`; the generated `ProcessLoopEndOperator` 
has no `__init__`/`open`, so it had no such default.
   >
   > Fix:
   > - `LoopEndOperator.__init__` now initializes `self.state = {}`, 
`self._loop_table = None`, and `self._consumed_state = False`. The generated 
operator inherits it.
   > - `run_update` sets `self._consumed_state = True` after the consume.
   > - `eval_condition` returns `False` when nothing has been consumed — the 
loop never iterated at this LoopEnd, so it must not fire the back-edge. Bare 
field init alone wasn't enough: `eval_condition` would otherwise `exec` the 
user's condition (e.g. `i < len(table)`) against an empty namespace and raise 
`NameError` on the undefined loop variable.
   >
   > Tests: unmasked `_StubLoopEnd` (dropped its `self.state = {}` so it 
mirrors the generated operator and exercises the base `__init__`); added 
`test_condition_returns_false_before_any_state_is_consumed` (your exact 
scenario — `condition()` with no prior consume returns `False`, no raise) and 
`test_consumed_flag_flips_after_run_update`. 16 loop-operator + 33 
main_loop/output_manager tests pass; no behavior change on the normal consume 
path.
   
   ---
   **`amber/src/main/python/core/models/operator.py` L452** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892263)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > `LoopEndOperator` doesn't itself declare `process_state`, but the 
generator overrides `process_state` anyway. To understand what either loop 
operator does, you have to read three files together: the Scala generator 
template, this base class, and its parents. The rules that tie them together — 
which method runs when, which keys are reserved (`loop_counter`, `table`, 
`output`, `LoopStartId`, `LoopStartStateURI`), what `self.state` must contain 
by the time `open()` returns — are not encoded anywhere as code; they're string 
conventions shared across files.
   
   **@aglinxinyuan** — 2026-06-04:
   > Closed in 873bd33d87. Most of the substance had already landed; this 
commit adds the discoverability layer.
   >
   > | Concern | Status |
   > |---|---|
   > | Generator overrides `process_state` opaquely | **Fixed in 411d92f67** — 
the LoopStart/End generator templates collapsed to thin delegates. `LoopStart` 
does `yield self.eval_output($output, table)`; `LoopEnd` does 
`self.run_update($update, state)` and `return self.eval_condition($condition)`. 
All substantive logic lives in the Python base classes (`eval_output`, 
`run_update`, `eval_condition`). |
   > | Reserved names as string conventions | **Mostly encoded** in prior 
commits — `loop_counter` / `LoopStartId` / `LoopStartStateURI` are typed fields 
on `StateFrame` (`core/models/payload.py`), not string keys in user state. 
`table` / `output` were filtered out of `self.state` by hard-coded logic in 
each helper. |
   > | Logic split across 3 files | **Generator already collapsed**; 
`LoopStartOpDescSpec` / `LoopEndOpDescSpec` pin that the emitted code uses only 
the base helpers (`code should include("self.eval_output(")` etc.) and contains 
no `loop_counter` logic. |
   > | "Which method runs when" / "what `self.state` must contain after 
`open()` returns" — not encoded anywhere as code-level prose | **Closed here** 
— class-level docstrings on `LoopStartOperator` and `LoopEndOperator` now 
document the lifecycle, subclass contract, and reserved-name space inline with 
the code. Discoverable via `help(LoopStartOperator)`. |
   > | Reserved-key set had no single discoverable source of truth | **Closed 
here** — new `_RESERVED_STATE_KEYS = frozenset({"table", "output"})` constant; 
the three filter sites in `eval_output` / `run_update` / 
`produce_state_on_finish` now read against this single source. 
`TestReservedStateKeysConstant` pins the set's contents (and that envelope-only 
names like `loop_counter` are NOT in it). |
   >
   > 12/12 tests in `test_loop_operators.py` green (3 new + 9 existing). Diff 
scoped to `operator.py` (docstrings + constant + helper rewrites) and 
`test_loop_operators.py` (one new test class).
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L100** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435450)
   
   **@Copilot** — 2026-05-18:
   > Deriving the LoopStart operator id from `worker_id` by string-splitting on 
`-` and `-main-0` is brittle: it silently assumes the worker name ends with 
`-main-0` and that the operator id itself never contains the literal `-main-0`. 
`SpecialPhysicalOpFactory` and other code paths already produce layer names 
containing underscores/hyphens, so a future renaming of the layer suffix or 
worker index will break this without any error. Prefer using the operator 
identity already available from the worker's context (e.g., the parsed 
`ActorVirtualIdentity` / physical-op id) rather than re-parsing the worker id 
string.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L100** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892239)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > `worker_id.split("-", 1)[1].rsplit("-main-0", 1)[0]` recovers the operator 
ID by chopping up the worker-name string. This depends on the exact 
worker-naming format. If the format ever changes — an extra dash, a different 
suffix, more than one worker per operator — the wrong ID is extracted silently, 
and the back-jump goes to the wrong operator with no error.
   
   **@aglinxinyuan** — 2026-06-01:
   > Good catch. Replaced the `-main-0` string-chop with a new 
`get_operator_id` helper in `core/util/virtual_identity.py` that parses the 
worker name with the shared `worker_name_pattern` — the same regex the engine 
already uses for `get_worker_index`, mirroring Scala 
`VirtualIdentityUtils.getPhysicalOpId`. It no longer assumes the layer name or 
worker index, correctly handles operator ids that contain dashes, and raises 
`ValueError` on an unrecognized worker id so a future naming change fails 
loudly instead of silently extracting the wrong id. Added `TestGetOperatorId` 
covering the dashed-op-id, non-`main`-layer / nonzero-index, and fail-loud 
cases. Fixed in 512841a78.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L107** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892245)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > `next(iter(reader_runnables.values()))[0].uri` picks whichever input port 
happens to be first, with no check that there is only one. If LoopStart ever 
has more than one input, this silently picks one.
   
   **@aglinxinyuan** — 2026-05-22:
   > The design of LoopStart is fixed; it has only one input.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L108** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892237)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > This writes `LoopStartStateURI` — a storage path — into the state object 
that flows through every operator in the loop body, including user UDFs. The 
URI is internal runtime data; user code shouldn't see it or be able to write to 
it. LoopEnd already has the operator ID, so the URI doesn't need to travel 
through state.
   
   **@aglinxinyuan** — 2026-06-04:
   > The substantive concern is closed by prior work; this commit (a374596844) 
adds the defensive test.
   >
   > **1. URI is no longer in user state.** Commit `30ba48c39f` moved 
`loop_counter` / `LoopStartId` / `LoopStartStateURI` onto the typed 
`StateFrame` envelope (`core/models/payload.py`). The user-facing operator's 
`process_state(state, port)` callback only ever receives `frame.frame` (the 
inner `State` dict) — the envelope's scalar fields are siblings, not keys 
inside it. Grep returns zero `state["LoopStartStateURI"]` / 
`output_state["LoopStartStateURI"]` accesses anywhere; the runtime captures the 
URI into `MainLoop._loop_start_state_uri` (`main_loop.py:374-375`) and reads it 
from there in `_jump_to_loop_start` (`main_loop.py:138`).
   >
   > **2. New test pins the property end to end.** Added 
`test_user_state_excludes_envelope_metadata_on_consume_branch` in 
`test_main_loop.py`: builds a `StateFrame` with envelope metadata, drives it 
through `_process_state_frame` on the consume branch, and asserts (a) the 
runtime captured the envelope onto its instance fields, but (b) the 
operator-facing `state_processing_manager.current_input_state` carries only the 
inner State's keys — no `LoopStartId` / `LoopStartStateURI` / `loop_counter` as 
string keys. If a future refactor accidentally merges envelope fields into the 
inner state, the test breaks.
   >
   > **3. On the secondary hint** — *"LoopEnd already has the operator ID, so 
the URI doesn't need to travel through state"*: technically true but requires 
either a new controller RPC (LoopEnd asks the controller for LoopStart's input 
URI given the op id) or a runtime URI registry / canonical naming scheme. The 
envelope already separates the URI from user-visible state, so leaving the 
envelope field as-is keeps the back-edge write path simple. Happy to pursue 
removing the envelope field too if you'd prefer that direction — let me know.
   >
   > 24/24 `test_main_loop.py` tests green; diff scoped to the one test file.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L122** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892249)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > `_attach_loop_start_id` and `_jump_to_loop_start` are the most fragile new 
methods — worker-name parsing, first-port assumption, state-key stripping, 
direct iceberg write — and they have no tests.
   
   **@aglinxinyuan** — 2026-06-04:
   > Most of the bullets here are already addressed by intermediate commits; 
this commit (f22738ecb6) closes the remaining gaps.
   >
   > | Concern | Status |
   > |---|---|
   > | Worker-name parsing (`split("-",1)[1].rsplit("-main-0",1)[0]`) | **Fixed 
in 512841a78b** — `_compute_loop_start_id` now delegates to 
`get_operator_id(...)` in `core/util/virtual_identity.py`, which is 
exhaustively tested in `test_virtual_identity.py` (canonical, hyphenated op id, 
non-main layer, digit-ending id, malformed inputs). The brittle inline parse is 
gone. |
   > | State-key stripping list `(LoopStartId, LoopStartStateURI, table, 
output)` | **Reduced in 30ba48c39f / 007a264b59 / 411d92f67** — `LoopStartId` / 
`LoopStartStateURI` / `loop_counter` now ride the `StateFrame` envelope, not 
user state. Only `(table, output)` (the user-exec runtime scratch) is still 
stripped; that strip is now tested. |
   > | First-port assumption (`next(iter(reader_runnables.values()))[0].uri`) | 
**Fixed in this commit** — `_compute_loop_start_id` raises `RuntimeError` if 
the input_manager reports more than one input port or more than one reader on 
the single port, instead of silently picking the dict iterator's first. |
   > | No tests | **Closed in this commit** — 7 new `test_main_loop.py` cases 
covering both methods: worker-id parse via `get_operator_id`, URI = 
state-channel of reader's result URI (so `VFSURIFactory.state_uri` isn't 
dropped), both defensive raises, the RPC with `target_operator_id` taken from 
the StateFrame envelope (never from user state), the scratch-key strip 
preserving user vars, and the exact iceberg-write contract (`create_document → 
writer("0") → put_one(State.to_tuple(0)) → close`). |
   >
   > Diff scoped to `main_loop.py` (defensive guards only) and 
`test_main_loop.py` (new cases). Loop integration coverage continues to live in 
`amber/src/test/integration/.../LoopIntegrationSpec.scala`. 23/23 tests in 
`test_main_loop.py` green.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L125** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435651)
   
   **@Copilot** — 2026-05-18:
   > The hardcoded state-dict keys `"LoopStartId"`, `"LoopStartStateURI"`, 
`"loop_counter"`, `"table"`, and `"output"` live in user state alongside 
arbitrary user variables (the `initialization` block writes into `self.state` 
via `exec(..., self.state)`). A user whose loop body uses any of these names 
(especially `table` or `output`, which are the documented defaults in the 
operator UI) will collide silently: their values get stripped on writeback, and 
`condition` evaluating `i < len(table)` may suddenly see a pickled bytes object 
after the strip. Consider namespacing the loop machinery under a single 
reserved key (e.g., `"__loop__": {...}`) so user state is untouched.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L137** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435473)
   
   **@Copilot** — 2026-05-18:
   > In `complete()`, `executor.condition()` is invoked for every 
`LoopEndOperator` worker, including nested LoopEnds whose `process_state` only 
saw the pass-through branch (`loop_counter > 0`) and therefore never executed 
`self.state = dict(state)`. In that case `self.state` either does not exist or 
is stale from a previous iteration, so `condition()` may raise 
`AttributeError`/`KeyError` or — worse — return a stale `True` and fire an 
unintended `jump_to_operator_region` writing garbage state back to LoopStart. 
Gate the `condition()`/jump on having actually absorbed a terminal state this 
iteration (e.g., remember whether `process_state` took the `loop_counter == 0` 
branch).
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L137** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892251)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > Two issues:
   > - `condition()` runs the user's Python expression with no error handling. 
If it throws (typo, wrong variable, divide-by-zero), `executor.close()` is 
skipped, the worker never finishes its state transition, and the workflow 
hangs. Same applies to `update`, `initialization`, `output`.
   > - Nothing stops an infinite loop. If the user's `condition` never returns 
False, the workflow runs forever with no iteration counter and no progress log.
   
   **@aglinxinyuan** — 2026-05-22:
   > 1. It's expected. We don't provide error handling for user-provided code, 
and it shares the same design as Python UDF.
   > 2. Infinite loop is valid.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L160** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851492)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > On thread 16, the "same as a Python UDF" reply holds for `update` / 
`initialization` / `output`, but not for `condition()`: it runs here on the 
main loop thread inside `complete()`, before `close()` and the COMPLETED 
transition — not on the guarded path a UDF error would take. A typo in 
`condition` fails the worker thread instead of being reported. Worth guarding 
this one call, or noting the difference.
   
   **@aglinxinyuan** — 2026-06-13:
   > Guarded it — fixed in 212687a8a6.
   >
   > You're right that `condition()` is the odd one out: `complete()` calls it 
on the main loop thread, before `close()` and the COMPLETED transition, outside 
`DataProcessor`'s guarded executor session. An exception there (a typo, an 
undefined name) propagated through `run()`'s `@logger.catch(reraise=True)` and 
killed the worker thread silently — the controller never learned of it.
   >
   > Now it's caught and reported the same way a UDF error is:
   > - record it on the exception manager,
   > - queue an **ERROR** console message,
   > - flush it, then enter `EXCEPTION_PAUSE` — **skipping the loop-back edge 
and completion**, so the worker pauses with the error showing instead of dying 
or falsely reporting success.
   >
   > To keep both paths reporting identically, I pulled the console-message 
build out of `DataProcessor._report_exception` into a shared 
`ConsoleMessageManager.report_exception(worker_id, exc_info)`; the data path 
and this main-loop path now call the same helper.
   >
   > Tests: added 
`test_complete_reports_loopend_condition_error_instead_of_crashing` — a Loop 
End whose `condition()` raises is recorded + reported (ERROR console message) + 
paused (`EXCEPTION_PAUSE`), with no loop-back and no completion. The full 
`main_loop`, `data_processor`, and loop-operator suites pass (47 tests); ruff 
clean.
   
   ---
   **`amber/src/main/python/core/runnables/main_loop.py` L267** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851498)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > `reset_storage()` runs for any LoopEnd with output state here, regardless 
of `loop_counter` — but the description says it only fires for the inner 
LoopEnd of a nested loop. See the note on `output_manager.py:241`.
   
   **@aglinxinyuan** — 2026-06-13:
   > You're right — the code is correct and the description was stale (and, 
worse, named the wrong branch). Fixed the description, and added a call-site 
comment in 3f11520450.
   >
   > `reset_output_storage()` fires **once per iteration for every Loop End**, 
on the matching-loop consume (`loop_counter == 0`). The nested pass-through 
(`loop_counter > 0`) is forwarded and returned in `_process_state_frame` 
*before* `process_input_state` runs, so it never resets. The old description 
had this backwards on three counts: it said inner-LoopEnd-only (it's every 
LoopEnd), it named the `loop_counter > 0` pass-through branch (reset is 
actually the `== 0` consume branch), and it claimed single loops never reset 
(they reset every iteration). The method's own docstring already states the 
correct behavior — *"once per loop iteration … each iteration must start from 
empty tables so the materialization holds only the final iteration's rows"* — 
so the description was the only wrong artifact.
   >
   > Why reset-every-iteration is correct, not a bug: dropping+recreating the 
Loop End's output each iteration is what keeps the materialization at the 
**final** iteration's rows instead of all iterations concatenated. Gating it to 
nested-inner-only would break the single-loop case, so no code change.
   >
   > Done:
   > * PR description "Worker output" row rewritten to match the code + 
docstring.
   > * Added a comment at the `reset_output_storage()` call site in 
`process_input_state` documenting the firing condition (where you were 
reading), so the next reader doesn't hit the same ambiguity.
   >
   > No behavior change. 33 main_loop + output_manager tests still pass.
   
   ---
   
**`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala`
 L583** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851511)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > **Resolved threads 21 and 22, but the fixes aren't in the branch.** The 
replies cited commits (`ca9e5ce8cc`, `540b7ba274`, `bbec98282e`) that aren't 
here. The flag is still the type-named `isLoopEnd`, and this skip-create branch 
is still untested. Please rename it to describe the behavior, and add a test 
that pre-creates the documents and checks they're reused, not recreated.
   
   **@aglinxinyuan** — 2026-06-13:
   > Both halves now in the branch.
   >
   > **Rename** (landed in 3d4f15b445): `isLoopEnd` → 
`reusesOutputStorageOnReExecution` on `PhysicalOp` (+ 
`withReusesOutputStorageOnReExecution`), the `RegionExecutionCoordinator` 
guard, `LoopEndOpDesc`, and the specs.
   >
   > **Test for the skip-create branch** (043ed302c3): I pulled the 
create-or-reuse decision out of the private `createOutputPortStorageObjects` 
into a pure companion method:
   >
   > ```scala
   > def provisionOutputDocument(uri, reuseExistingStorage, documentExists, 
createDocument): Boolean
   > ```
   >
   > with the storage ops injected, so the decision is unit-testable without an 
iceberg backend or a live region. `RegionOutputProvisioningSpec` pins the four 
cases with a `createDocument` spy — directly the "pre-create, then check reused 
not recreated" you asked for:
   >
   > * **reuse + existing document → NOT recreated** (createDocument never 
called) — accumulated loop output survives the re-run;
   > * reuse + no document yet → created (first iteration);
   > * no-reuse + existing → recreated/overwritten (fresh every run);
   > * no-reuse + none → created;
   > * plus: no-reuse short-circuits and never even probes `documentExists`.
   >
   > On verification: I confirmed the production change compiles — the only 
remaining `amber` compile errors are the pre-existing `PveManager` / 
`virtual_environments` JOOQ issue from #5577 (unrelated; my local DB isn't 
migrated with that table, CI builds against a fresh schema). The new spec is a 
pure ScalaTest unit with no iceberg/actor dependency, so it runs in the normal 
amber test job.
   >
   > (For context on why these kept showing as "cited but not in the branch": 
the branch has been force-rebased a few times, which dropped the earlier 
commits — re-applied now on the current tip.)
   
   ---
   
**`amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala`
 L589** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892266)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > The skip-create branch has no tests. Without this change, loops would wipe 
their own output every iteration — so this is a load-bearing invariant with no 
coverage.
   
   **@aglinxinyuan** — 2026-05-27:
   > Added coverage in ca9e5ce8cc.
   >
   > Extracted the per-document create-or-reuse decision out of the private 
`createOutputPortStorageObjects` into a testable 
`RegionExecutionCoordinator.provisionOutputDocument(uri, schema, 
reuseIfExists)`, then added `RegionExecutionCoordinatorSpec` exercising the 
full truth table against a real iceberg-backed `DocumentFactory`:
   >
   > | reuseIfExists | doc state before | expected after |
   > |---|---|---|
   > | false | absent | empty doc created |
   > | true | absent (1st iteration) | empty doc created |
   > | **true** | **has 3 rows** | **3 rows preserved** ← the load-bearing 
invariant |
   > | false | has 3 rows | wiped to 0 (contrast case) |
   >
   > The third case is the one you flagged: it proves a re-executing loop 
region does **not** wipe the output its previous iterations accumulated. The 
fourth shows the non-reuse path still starts fresh, so the guard is actually 
doing something. No behavior change — `createOutputPortStorageObjects` just 
delegates to the helper now.
   
   ---
   
**`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala`
 L71** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892233)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > Two issues:
   > - The execution service uses `isInstanceOf[LoopStartOpDesc]` to detect 
loops, which makes a generic service depend on a specific operator class.
   > - This rule (force MATERIALIZED if a LoopStart is in the plan) has no 
tests.
   
   **@aglinxinyuan** — 2026-05-27:
   > Both addressed in 1848ce00fb.
   >
   > **1. Generic service no longer depends on a specific operator class.** 
Added `LogicalOp.requiresMaterializedExecution` (default `false`); 
`LoopStartOpDesc` and `LoopEndOpDesc` override it to `true`. 
`WorkflowExecutionService` now checks that flag instead of 
`isInstanceOf[LoopStartOpDesc]` — the `LoopStartOpDesc` import is gone, and any 
future operator that needs materialized edges just sets the flag.
   >
   > **2. The rule is now tested.** Extracted it into a pure, side-effect-free 
`WorkflowExecutionService.resolveWorkflowSettings(operators, requested)` (the 
constructor body just delegates), and added `WorkflowExecutionServiceSpec`:
   >
   > * `requiresMaterializedExecution` is true for `LoopStart`/`LoopEnd`, false 
for a non-loop op (`SleepOpDesc`);
   > * loop present (incl. a plan mixing loop + non-loop ops) → coerced to 
MATERIALIZED;
   > * `LoopEnd` alone also coerces;
   > * non-loop op and empty plan pass through unchanged;
   > * idempotent when the user already chose MATERIALIZED;
   > * only the `executionMode` field changes (other settings preserved).
   >
   > 7 tests, all passing.
   
   ---
   
**`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala`
 L74** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892230)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > When the workflow contains a Loop Start, this block forces the execution 
mode to MATERIALIZED. The frontend setting still displays whatever the user 
picked. The UI and the running system disagree, and the user has no way to know.
   
   **@aglinxinyuan** — 2026-05-27:
   > Fixed in 30bf1cd136 by failing loud instead of silently coercing.
   >
   > `WorkflowExecutionService.validateExecutionMode` now throws an 
`IllegalArgumentException` when the plan contains an operator that requires 
materialized execution but the requested mode isn't MATERIALIZED:
   >
   > > This workflow contains operators that require MATERIALIZED execution 
mode (e.g. Loop Start / Loop End). Please set the execution mode to 
Materialized in the workflow settings and run again.
   >
   > `WorkflowService` already wraps the constructor in `try { ... } catch { 
case e => errorHandler(e) }`, so this surfaces to the UI as a fatal workflow 
error — the user is told to switch the mode and re-run, rather than the UI and 
engine silently disagreeing. The requirement is still keyed off the generic 
`LogicalOp.requiresMaterializedExecution` flag (no operator-class dependency), 
and the spec covers it: loop + non-MATERIALIZED throws (including a 
LoopEnd-only plan), while a loop already set to MATERIALIZED, a non-loop op, 
and an empty plan all pass.
   
   **@aglinxinyuan** — 2026-06-09:
   > <img width="1417" height="764" alt="image" 
src="https://github.com/user-attachments/assets/d37f82df-535c-44ba-a6d9-cd1a6525e4df";
 />
   
   ---
   
**`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala`
 L75** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435635)
   
   **@Copilot** — 2026-05-18:
   > The coercion to `MATERIALIZED` happens only when a `LoopStartOpDesc` is 
present, but `LoopEndOpDesc` is what actually requires the iceberg output to 
persist across region invocations (see `RegionExecutionCoordinator`'s 
`isLoopEnd` branch). A workflow with a `LoopEndOpDesc` but no `LoopStartOpDesc` 
(e.g., malformed, partially-edited, or in a future use of LoopEnd as a 
standalone control op) would silently slip through pipelined mode. Also 
consider also checking for `LoopEndOpDesc` to keep both halves in sync.
   
   ---
   
**`amber/src/main/scala/org/apache/texera/web/service/WorkflowExecutionService.scala`
 L78** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851505)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > **Resolved thread 11, but the fix isn't in the branch.** The reply 
described a generic `LogicalOp.requiresMaterializedExecution` flag and a test 
(commit `1848ce00fb`); neither exists. Line 78 still uses 
`isInstanceOf[LoopStartOpDesc]`, tying this service to one operator class, with 
no test. It also only checks LoopStart, so a plan with only a LoopEnd would 
skip the check. Please re-open.
   
   **@aglinxinyuan** — 2026-06-13:
   > Re-applied in 7fd110a833 (it was lost in a force-rebase).
   >
   > All three points:
   >
   > 1. **No longer tied to a class.** Added 
`LogicalOp.requiresMaterializedExecution` (default false); `LoopStartOpDesc` 
and `LoopEndOpDesc` both override it to true. `WorkflowExecutionService` checks 
the flag, so the `LoopStartOpDesc` import/`isInstanceOf` is gone — any future 
operator needing materialization just sets the flag.
   >
   > 2. **LoopEnd-only no longer slips through.** Because the check is 
`operators.exists(_.requiresMaterializedExecution)` and LoopEnd sets the flag 
too, a plan with a LoopEnd but no LoopStart now triggers the guard. There's an 
explicit test for that case.
   >
   > 3. **Now tested.** Extracted the logic into a pure 
`WorkflowExecutionService.validateExecutionMode(operators, settings)` (the 
constructor delegates to it), and added `WorkflowExecutionServiceSpec`:
   >    * `requiresMaterializedExecution` true for LoopStart/LoopEnd, false for 
a non-loop op (SleepOpDesc);
   >    * loop + PIPELINED → throws (incl. the LoopEnd-only plan);
   >    * loop + MATERIALIZED, non-loop + PIPELINED, and an empty plan → no 
throw.
   >
   > Verified locally (after migrating my local DB for the unrelated #5577 JOOQ 
table): 6 new tests pass, the 29 LoopStart/LoopEnd op-desc specs pass, and 
amber compiles. The behavior stays fail-loud (throw with an actionable 
message), per the earlier decision on this thread.
   
   ---
   
**`amber/src/test/integration/org/apache/texera/amber/engine/e2e/LoopIntegrationSpec.scala`
 L160** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851518)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > Both cases only check that the workflow reaches COMPLETED, then `succeed` 
— they never read the output or the iteration count. For a loop, the iteration 
count is the main thing to verify: a counter bug that still terminates would 
pass this test. The description's "3 iterations" / "9 nested" are what to 
assert here.
   
   **@aglinxinyuan** — 2026-06-13:
   > Fixed in 962b6c1401 — both tests now assert the iteration count, not just 
termination.
   >
   > Each test captures the terminal LoopEnd's cumulative output-tuple count 
from `ExecutionStatsUpdate` (keyed by logical op id; the controller delivers it 
after querying final worker stats, before `ExecutionStateUpdate(COMPLETED)`; 
the worker persists across the `JumpToOperatorRegion` re-executions, so its 
output statistic accumulates across iterations rather than resetting). Since 
`LoopEnd.process_table` is an identity pass-through (`yield table`), by 
conservation that count equals the number of rows that flowed through the loop 
= the iteration count.
   >
   > * **Single loop**: `assert(LoopEnd output == 3)` — `i` advances 0,1,2 and 
stops at `i == 3`. An off-by-one that still terminated lands on 2 or 4.
   > * **Nested loop**: I also had to fix the workflow — the outer `LoopStart` 
previously emitted `table.iloc[i]` (one row), so the inner loop would have run 
only once per outer iteration (3 total), **not** the 9 the comment claimed. It 
now emits the whole `table` (matching Nested.Loop.json), so the inner body 
genuinely runs 3 × 3 = 9 times; `assert(outer LoopEnd output == 9)`.
   >
   > I asserted only the terminal LoopEnd counts because they're robust by the 
identity-passthrough conservation argument regardless of nested 
region-scheduling details; the `9` also matches the Nested.Loop.json run in the 
PR description. Note these are `@IntegrationTest` (postgres + MinIO + Python 
workers), so the numbers will be confirmed by the `amber-integration` CI job 
rather than locally — flagging in case the nested count needs a tweak once it 
runs there.
   
   ---
   **`amber/src/test/python/core/models/test_loop_operators.py` L1** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892281)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > Verify CI picks up this path. Other Python sources are under 
`amber/src/main/python/...`; if `amber/src/test/python/...` isn't included in 
the runner config, these tests silently don't run.
   
   **@aglinxinyuan** — 2026-05-22:
   > Verified — the tests are picked up:
   >
   > * `amber/pyproject.toml` declares `testpaths = ["src/test/python"]`, so 
any `pytest` run from `amber/` discovers anything under `src/test/python/...`.
   > * `.github/workflows/build.yml` line 626 runs `cd amber && pytest -m "not 
integration" ...` from that directory.
   > * On the most recent CI run for this PR ([python 3.10 
job](https://github.com/apache/texera/actions/runs/26308949081/job/77452693976)),
 all 12 `test_loop_operators.py` tests ran and passed — grep 
`test_loop_operators` in the log returns 12 PASSED lines.
   
   ---
   **`amber/src/test/python/core/models/test_loop_operators.py` L231** · [view 
on #4206](https://github.com/apache/texera/pull/4206#discussion_r3285892285)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > `pytest.raises(TypeError, match="condition")` matches on Python's "missing 
abstract method" error text. That wording has changed between CPython versions 
before. Match on `"abstract"` instead, or drop `match`.
   
   **@aglinxinyuan** — 2026-05-22:
   > Fixed in c852a7daa0 — `match="abstract"` now, plus a comment recording why 
(CPython's missing-abstract-method message wording has changed between releases 
but always contains "abstract").
   
   ---
   
**`common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala`
 L201** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892270)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > `isLoopEnd` names the flag after one specific operator type. What the 
scheduler actually checks is more general — "this operator's output must 
survive across re-runs of its region." If anything else ever needs the same 
behavior, we'll either reuse a misnamed flag or add a near-duplicate.
   
   **@aglinxinyuan** — 2026-05-27:
   > Good point — fixed in 540b7ba274, with the flag renamed in bbec98282e.
   >
   > The branch had since moved to detecting loop ends by string-matching the 
operator id (`startsWith("LoopEnd-operator-")`), which keys the behavior to the 
operator type even more tightly than `isLoopEnd` did. Replaced both with a 
behavior-named flag:
   >
   > * `PhysicalOp.reusesOutputStorageOnReExecution: Boolean = false` — 
documented as "this operator's output storage should be reused (reopened) 
rather than recreated fresh when its region is executed more than once", 
explicitly noting any operator can set it, not just Loop End.
   > * `LoopEndOpDesc` sets it via 
`.withReusesOutputStorageOnReExecution(true)`.
   > * `RegionExecutionCoordinator` now checks 
`region.getOperators.exists(_.reusesOutputStorageOnReExecution)` instead of the 
id prefix.
   >
   > The name states exactly what the scheduler does — reopen existing output 
storage on a region re-run instead of recreating it — so it reasons about the 
property rather than the operator, and a future operator needing the same 
treatment just sets the flag.
   
   ---
   
**`common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala`
 L201** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3400851514)
   
   **@Xiao-zhen-Liu** — 2026-06-12:
   > **Resolved thread 22, but the rename isn't in the branch.** Still 
`isLoopEnd` — named after the operator, not after the behavior the scheduler 
checks ("keep this operator's output across a region re-run"). Renaming it lets 
the next operator that needs the same behavior reuse it without a misleading 
name.
   
   **@aglinxinyuan** — 2026-06-13:
   > You're right — the rename was lost in a rebase. Re-applied in 3d4f15b445.
   >
   > `isLoopEnd` → `reusesOutputStorageOnReExecution` (named after the behavior 
the scheduler checks — "keep this operator's output storage across a region 
re-run" — not after LoopEnd):
   >
   > * `PhysicalOp`: field + `withIsLoopEnd` → 
`withReusesOutputStorageOnReExecution`, with a doc comment stating the behavior 
and that any operator needing it can set it.
   > * `RegionExecutionCoordinator`: the skip-recreate guard now checks 
`region.getOperators.exists(_.reusesOutputStorageOnReExecution)` (local val + 
comment reworded to the behavior).
   > * `LoopEndOpDesc` sets `.withReusesOutputStorageOnReExecution(true)`.
   > * Loop specs + mixin updated.
   >
   > So the next operator that must preserve its output across a region 
re-execution can reuse the flag without a LoopEnd-specific misnomer.
   >
   > Verified: WorkflowCore + WorkflowOperator compile and all 29 
LoopStart/LoopEnd op-desc specs pass. (Heads up: the `amber` module's local 
compile is currently blocked by an unrelated pre-existing issue — `PveManager` 
from #5577 references the `virtual_environments` table, which my un-migrated 
local DB lacks, so JOOQ codegen omits it; that's environmental and CI compiles 
against a fresh schema. The `RegionExecutionCoordinator` edit is a one-token 
rename to the now-existing field.)
   
   ---
   
**`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala`
 L83** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435405)
   
   **@Copilot** — 2026-05-18:
   > Same problem as `LoopStartOpDesc`: `update` and `condition` are 
interpolated directly into double-quoted `exec(...)` literals. Any expression 
containing `"`, `\`, or a newline will produce invalid Python, and the values 
are effectively executable code-injection sites. Inline them as plain Python 
statements (e.g., on their own indented line) or escape them defensively.
   
   ---
   
**`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala`
 L37** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892274)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > User expressions execute against `self.state`, which the runtime also uses 
to store `loop_counter`, `table`, `output`, `LoopStartId`, `LoopStartStateURI`. 
A user writing `loop_counter = 0` in their `initialization`, or `table = ...`, 
silently overwrites loop machinery — with no warning. Same applies to `update` 
/ `condition` in `LoopEndOpDesc`.
   
   **@aglinxinyuan** — 2026-06-03:
   > Addressed across 411d92f67 (plus the earlier loop_counter / LoopStartId / 
LoopStartStateURI moves). The reserved names no longer share the user exec 
namespace:
   >
   > - `loop_counter`, `LoopStartId`, `LoopStartStateURI` are now entirely off 
`State` -- they ride the `StateFrame` envelope as their own materialized 
columns, owned by the runtime, so they never appear in `self.state` and user 
code cannot touch them.
   > - `table` and `output`: each user expression (`initialization` / `output` 
/ `update` / `condition`) now runs in a throwaway namespace seeded with the 
user loop variables plus the input `table`; `output` is read back out of it, 
and only the user variables are persisted into `self.state`. So `table` stays 
readable, but neither reserved name can persist in -- or be silently clobbered 
out of -- the loop state.
   >
   > A user writing `loop_counter = 0`, `table = ...`, etc. in their loop code 
can therefore no longer overwrite loop machinery. The exec logic lives in 
tested LoopStart/LoopEnd base helpers (`eval_output` / `run_update` / 
`eval_condition`); the generated operators just delegate.
   
   ---
   
**`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala`
 L73** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892278)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > The user's expression is dropped into a string literal: 
`exec("$initialization", {}, self.state)`. With `initialization = 'name = 
"foo"'`, the generator produces `exec("name = "foo"", ...)` — invalid Python. 
The error doesn't surface until the operator's class loads at run time, far 
from where the user typed the expression. Multi-line input has the same 
problem. The same pattern appears in `LoopEndOpDesc.scala:78-83`.
   
   **@aglinxinyuan** — 2026-06-04:
   > Fixed in 9ec60f07cc.
   >
   > Switched the four user fields (`initialization`, `output`, `update`, 
`condition`) to `EncodableString` and rewrote both `generatePythonCode()` with 
the `pyb"..."` macro from `common/pybuilder`. The macro base64-encodes each 
splice at build time and emits a `self.decode_python_template('<b64>')` call at 
runtime — the raw user text never enters the generated source, so 
quotes/newlines/backslashes/etc. can't break it. It also rejects (at compile 
time) any splice placed inside a quoted string, so the old `"$initialization"` 
pattern can't sneak back in. The surrounding `"..."` around the `exec(...)` 
arguments is gone because the decoder already returns a Python `str`; for the 
`output = ` / `condition` branches the literal prefix stays inline 
(`exec("output = " + $output, ...)`), so the runtime concatenates the literal 
with the decoded expression.
   >
   > Added `LoopOpDescsSpec` (new — there was no spec for the loop OpDescs) 
covering both: subclassing of `LoopStart/EndOperator`, the four `exec` call 
sites use `self.decode_python_template`, a tricky value with `"`, `'`, `\n`, 
`\` does NOT appear verbatim in the generated source, and empty-default fields 
still produce a parseable template. 10 tests, all passing.
   >
   > Diff scoped to the two OpDescs + the new spec; no unrelated churn.
   
   ---
   
**`common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopStartOpDesc.scala`
 L76** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3262435359)
   
   **@Copilot** — 2026-05-18:
   > User-supplied expressions are interpolated raw into a double-quoted Python 
`exec(...)` string. Any double quote, backslash, or newline in `initialization` 
or `output` will produce a Python SyntaxError at operator-build time and is a 
code-injection vector (e.g., an `output` containing `"); import os; 
os.system("...` would escape the exec call entirely). Consider injecting these 
expressions as proper indented Python source rather than as `exec` string 
literals, or at minimum escape the value (e.g., wrap in triple-quotes and 
reject embedded triple-quotes).
   
   ---
   
**`common/workflow-operator/src/test/scala/org/apache/texera/amber/operator/loop/LoopEndOpDescSpec.scala`
 L1** · [view on 
#4206](https://github.com/apache/texera/pull/4206#discussion_r3285892288)
   
   **@Xiao-zhen-Liu** — 2026-05-22:
   > Two test-suite issues:
   > - Nothing in the tests actually compiles the generator's output. The Scala 
specs check that the generated string contains expected substrings; the Python 
tests use hand-written stub classes that mimic what the generator should 
produce. So a quote or newline in user input that breaks the codegen 
(`LoopStartOpDesc.scala:70-73` / `LoopEndOpDesc.scala:78-83`) passes both test 
layers and only fails at run time.
   > - `LoopStartOpDescSpec` and `LoopEndOpDescSpec` duplicate about 80% of 
their scaffolding.
   
   **@aglinxinyuan** — 2026-05-22:
   > Both addressed in d41918f461.
   >
   > **1. Codegen robustness.** Both `LoopStartOpDesc.generatePythonCode` and 
`LoopEndOpDesc.generatePythonCode` are now built with the `pyb"..."` 
interpolator, with `initialization` / `output` / `update` / `condition` typed 
as `EncodableString`. Every user value is base64-encoded at build time and 
rendered as `self.decode_python_template(<b64>)` instead of being inlined as a 
raw quoted substring. A `"` / `` / `\n` / `\` in user input therefore cannot 
escape into the surrounding Python syntax. Added 8 new tests (4 per spec) that 
exercise these exact tricky inputs and assert the raw text is **absent** from 
the generated source while the expected `decode_python_template`-wrapped 
substring is present.
   >
   > **2. Spec deduplication.** Extracted `LoopOpDescSpecMixin` carrying the 
`workflowId` / `executionId` vals, the `b64` / `decodeExpr` helpers, and the 
shared physical-op assertions (`assertNonParallelizableSingleWorker`, 
`assertPortsCarriedForward`, `assertOpExecWithPythonCodeForClass`, 
`assertUserInputIsBase64Wrapped`). Both specs are now focused on the 
per-operator differences only.
   
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to