Copilot commented on code in PR #5700:
URL: https://github.com/apache/texera/pull/5700#discussion_r3408778138


##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
+import 
org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext
+
+class LoopEndOpDesc extends LoopOpDesc {
+  @JsonProperty(required = true, defaultValue = "i += 1")
+  @JsonSchemaTitle("Update")
+  var update: EncodableString = ""
+
+  @JsonProperty(required = true, defaultValue = "i < len(table)")
+  @JsonSchemaTitle("Condition")
+  var condition: EncodableString = ""
+
+  override protected def operatorName: String = "Loop End"
+
+  override protected def operatorDescription: String =
+    "Close a loop body and decide whether to iterate again based on a 
condition; pairs with Loop Start."
+
+  override protected def reusesOutputStorage: Boolean = true
+
+  // User-supplied `update` and `condition` are interpolated via the `pyb`
+  // builder, which base64-encodes each EncodableString and renders it as a
+  // `self.decode_python_template('<b64>')` expression. So an arbitrary user
+  // string -- including quotes, newlines, or backslashes -- can never break 
the
+  // surrounding Python syntax, because the text is no longer pasted in raw.
+  override def generatePythonCode(): String = {
+    pyb"""
+       |from pytexera import *
+       |class ProcessLoopEndOperator(LoopEndOperator):
+       |    @overrides
+       |    def process_state(self, state: State, port: int) -> 
Optional[State]:
+       |      self.run_update($update, state)
+       |      return None
+       |
+       |    @overrides
+       |    def condition(self) -> bool:
+       |      return self.eval_condition($condition)

Review Comment:
   This line is also mis-indented in the generated Python (6 spaces instead of 
8), which will break the generated Loop End class at import time.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -576,8 +609,25 @@ class RegionExecutionCoordinator(
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(resultURI, schema)
-        DocumentFactory.createDocument(stateURI, State.schema)
+        // Operators that reuse their output storage across region re-runs
+        // (e.g. LoopEnd, whose output accumulates across the iterations of its
+        // own loop) already have their result/state documents from a prior
+        // run; on re-execution `createDocument` (overrideIfExists=true) would
+        // clobber them, so reuse the existing document when it is already
+        // there. (The inner LoopEnd of a nested loop additionally drops its
+        // output once per outer iteration -- on the Python worker side in
+        // MainLoop._process_state_frame -- which is orthogonal to this
+        // region-provisioning reuse.)
+        val reusesOutputStorage = 
region.getOperators.exists(_.reusesOutputStorageOnReExecution)

Review Comment:
   `reusesOutputStorage` is computed for the whole region (`exists`) and then 
applied to every output port. If a region contains a LoopEnd plus other 
operators whose outputs should be recreated per re-execution, this will 
incorrectly reuse their output documents across iterations and preserve 
stale/duplicate data. The reuse decision should be per-operator (the operator 
that owns `outputPortId.opId`).



##########
common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/loop/LoopEndOpDesc.scala:
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.texera.amber.operator.loop
+
+import com.fasterxml.jackson.annotation.JsonProperty
+import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
+import org.apache.texera.amber.pybuilder.PyStringTypes.EncodableString
+import 
org.apache.texera.amber.pybuilder.PythonTemplateBuilder.PythonTemplateBuilderStringContext
+
+class LoopEndOpDesc extends LoopOpDesc {
+  @JsonProperty(required = true, defaultValue = "i += 1")
+  @JsonSchemaTitle("Update")
+  var update: EncodableString = ""
+
+  @JsonProperty(required = true, defaultValue = "i < len(table)")
+  @JsonSchemaTitle("Condition")
+  var condition: EncodableString = ""
+
+  override protected def operatorName: String = "Loop End"
+
+  override protected def operatorDescription: String =
+    "Close a loop body and decide whether to iterate again based on a 
condition; pairs with Loop Start."
+
+  override protected def reusesOutputStorage: Boolean = true
+
+  // User-supplied `update` and `condition` are interpolated via the `pyb`
+  // builder, which base64-encodes each EncodableString and renders it as a
+  // `self.decode_python_template('<b64>')` expression. So an arbitrary user
+  // string -- including quotes, newlines, or backslashes -- can never break 
the
+  // surrounding Python syntax, because the text is no longer pasted in raw.
+  override def generatePythonCode(): String = {
+    pyb"""
+       |from pytexera import *
+       |class ProcessLoopEndOperator(LoopEndOperator):
+       |    @overrides
+       |    def process_state(self, state: State, port: int) -> 
Optional[State]:
+       |      self.run_update($update, state)
+       |      return None

Review Comment:
   The generated Python in this template has invalid indentation: method body 
lines are indented 6 spaces instead of 8, which will raise an IndentationError 
at runtime for Loop End operators.



##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -203,37 +195,90 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
-    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+    def save_state_to_storage_if_needed(
+        self,
+        state: State,
+        loop_counter: int,
+        loop_start_id: str = "",
+        loop_start_state_uri: str = "",
+        port_id=None,
+    ) -> None:
         # When port_id is omitted the same state row is fanned out to
         # every output port's state table. This mirrors the
         # broadcast-to-all-workers behavior on the emit side: state is
         # shared context, not per-key data, so every downstream operator
         # (and every worker reading the materialization) needs the full
         # set.
-        element = PortStorageWriterElement(data_tuple=state.to_tuple())
+        element = PortStorageWriterElement(
+            data_tuple=state.to_tuple(loop_counter, loop_start_id, 
loop_start_state_uri)
+        )
         if port_id is None:
             for writer_queue, _, _ in self._port_state_writers.values():
                 writer_queue.put(element)
         elif port_id in self._port_state_writers:
             self._port_state_writers[port_id][0].put(element)
 
+    def reset_output_storage(self) -> None:
+        """Drop and recreate this operator's result and state tables, then
+        reopen the storage writers against the empty tables.
+
+        Called only for the *inner* Loop End of a *nested* loop, once per
+        *outer* iteration, from the outer pass-through branch in
+        ``MainLoop._process_state_frame`` (the ``loop_counter > 0`` branch).
+        A Loop End accumulates the results of all of its own iterations; the
+        inner Loop End must, in addition, drop the previous outer iteration's
+        rows when the outer loop advances, so each outer iteration accumulates
+        from empty rather than concatenating across outer iterations. A single
+        / outermost Loop End never reaches that branch and so never resets --
+        it keeps all of its iterations.
+
+        Truncating live storage is safe here because a workflow containing a
+        loop runs in MATERIALIZED execution mode: downstream operators do not
+        start reading this output until the loop region has fully completed,
+        so no reader can observe an intermediate truncation.
+
+        Preconditions (always satisfied by a Loop End worker): the operator
+        has exactly one output port, and ``set_up_port_storage_writer`` has
+        already run for it (so ``_storage_uri_base`` is populated). Both are
+        checked so future misuse fails loudly instead of silently resetting
+        the wrong port or dereferencing ``None``.
+        """
+        port_ids = self.get_port_ids()
+        if len(port_ids) != 1:
+            raise RuntimeError(
+                f"reset_output_storage expects exactly one output port, "
+                f"but found {len(port_ids)}"
+            )
+        if self._storage_uri_base is None:
+            raise RuntimeError(
+                "reset_output_storage called before the output port's storage "
+                "writer was set up"
+            )
+        port_id = port_ids[0]
+        storage_uri_base = self._storage_uri_base
+        self.close_port_storage_writers()
+        DocumentFactory.create_document(
+            VFSURIFactory.result_uri(storage_uri_base),
+            self._ports[port_id].get_schema(),
+        )
+        DocumentFactory.create_document(
+            VFSURIFactory.state_uri(storage_uri_base), State.SCHEMA
+        )
+        self.set_up_port_storage_writer(port_id, storage_uri_base)
+
     def close_port_storage_writers(self) -> None:
         """
         Flush the buffers of port storage writers and wait for all the
         writer threads to finish, which indicates the port storage writing
         are finished.
         """
-        for _, writer, _ in self._port_storage_writers.values():
-            # This non-blocking stop call will let the storage writers
-            # flush the remaining buffer
-            writer.stop()
-        for _, _, writer_thread in self._port_storage_writers.values():
-            # This blocking call will wait for all the writer to finish commit
-            writer_thread.join()
-        for _, state_writer, _ in self._port_state_writers.values():
-            state_writer.stop()
-        for _, _, state_writer_thread in self._port_state_writers.values():
-            state_writer_thread.join()
+        for registry in (self._port_storage_writers, self._port_state_writers):
+            # Non-blocking stop lets each writer flush its remaining buffer;
+            # the join then waits for the commit to finish.
+            for _, writer, _ in registry.values():
+                writer.stop()
+            for _, _, thread in registry.values():
+                thread.join()
         self._port_state_writers.clear()

Review Comment:
   `close_port_storage_writers` stops and joins both result and state writer 
threads, but only clears `_port_state_writers`. Leaving `_port_storage_writers` 
populated keeps references to stopped threads/writers and can cause repeated 
close/reset flows to act on stale entries. Clear both registries after shutdown.



##########
amber/src/main/python/core/util/virtual_identity.py:
##########
@@ -24,18 +24,34 @@
     ActorVirtualIdentity,
 )
 
-worker_name_pattern = re.compile(r"Worker:WF\d+-.+-(\w+)-(\d+)")
+worker_name_pattern = re.compile(r"Worker:WF(\d+)-(.+)-(\w+)-(\d+)")

Review Comment:
   The regex uses `re.match` without anchoring to the end of the string, so 
malformed worker ids with trailing junk (e.g. `Worker:WF1-op-main-0XYZ`) will 
still parse. This contradicts the docstring claim that it mirrors Scala 
`VirtualIdentityUtils.getPhysicalOpId` (which requires a full match). Anchor 
the pattern (or switch to `fullmatch`) so invalid ids fail loudly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to