Copilot commented on code in PR #4490:
URL: https://github.com/apache/texera/pull/4490#discussion_r3185360832


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +234,22 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {
+    try {
+      storageUris.foreach { uri =>
+        val writer = DocumentFactory
+          .openDocument(State.uriFromResultUri(uri))
+          ._1
+          .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+          .asInstanceOf[BufferedItemWriter[Tuple]]
+        writer.putOne(state.toTuple)
+        writer.close()
+      }

Review Comment:
   `saveStateToStorageIfNeeded` opens a document and closes a writer for every 
state and every output storage URI. Because Iceberg commits happen on 
`close()`, this can create many small files/snapshots and add significant 
overhead. Consider reusing a long-lived buffered writer (similar to tuple 
storage writer threads) and flushing on port completion instead of per state.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+
+      try {
+        val state_document =
+          DocumentFactory
+            .openDocument(State.uriFromResultUri(uri))
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+        val stateReadIterator = state_document.get()
+
+        while (stateReadIterator.hasNext) {
+          val state = State.fromTuple(stateReadIterator.next())
+          inputMessageQueue.put(
+            FIFOMessageElement(WorkflowFIFOMessage(channelId, 
getSequenceNumber, StateFrame(state)))
+          )
+        }

Review Comment:
   Materialized state is enqueued only after all tuples have been read/flushed, 
which means downstream will observe state updates after the full data stream. 
Operators like `IfOpExec` rely on state to affect how subsequent tuples are 
routed/processed, so state should be emitted before (or otherwise in a 
well-defined order relative to) the tuples read from materialization.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala:
##########
@@ -126,6 +126,7 @@ class DataProcessor(
       val outputState = executor.processState(state, port)
       if (outputState.isDefined) {
         outputManager.emitState(outputState.get)
+        outputManager.saveStateToStorageIfNeeded(state)

Review Comment:
   `saveStateToStorageIfNeeded` is called with the *input* `state`, but the 
executor may transform/drop state via `processState`. This likely materializes 
the wrong state (and can diverge from what was actually emitted). Pass the 
emitted `outputState.get` (or otherwise clarify which state should be 
persisted).
   



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +234,22 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {
+    try {
+      storageUris.foreach { uri =>
+        val writer = DocumentFactory
+          .openDocument(State.uriFromResultUri(uri))
+          ._1
+          .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+          .asInstanceOf[BufferedItemWriter[Tuple]]
+        writer.putOne(state.toTuple)
+        writer.close()
+      }
+    } catch {
+      case _: Exception => ()
+    }

Review Comment:
   `saveStateToStorageIfNeeded` wraps the entire loop in a broad `catch { case 
_: Exception => () }`, which can silently lose state materialization on any 
failure. Prefer handling the expected 'missing state storage' case explicitly 
and logging unexpected exceptions (possibly with the affected URI) so state 
loss is observable.



##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -171,6 +175,24 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple, 
port_id=None) -> None:
                 PortStorageWriterElement(data_tuple=tuple_)
             )
 
+    def save_state_to_storage_if_needed(self, state: State, port_id=None) -> 
None:
+        if port_id is None:
+            uris = self._storage_uris.values()
+        elif port_id in self._storage_uris:
+            uris = [self._storage_uris[port_id]]
+        else:
+            return
+
+        for uri in uris:
+            state_uri = State.uri_from_result_uri(uri)
+            try:
+                document = DocumentFactory.open_document(state_uri)[0]
+            except ValueError:
+                document = DocumentFactory.create_document(state_uri, 
State.SCHEMA)
+            writer = document.writer(str(uuid.uuid4()))
+            writer.put_one(state.to_tuple())
+            writer.close()
+

Review Comment:
   `save_state_to_storage_if_needed` performs synchronous Iceberg open/create + 
write + close for each state (and uses a fresh UUID writer id each time). This 
can create many tiny commits/files and block the caller thread. Consider 
buffering state writes (e.g., a dedicated writer thread per port/URI like tuple 
storage) and reusing a stable writer identifier (e.g., worker index) to reduce 
overhead.



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala:
##########
@@ -57,6 +58,9 @@ object State {
 
   def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))
 
+  def uriFromResultUri(resultUri: URI): URI =
+    new URI(resultUri.toString.replace("/result", "/state"))
+

Review Comment:
   `uriFromResultUri` uses a raw string replace on the full URI, which can 
produce incorrect results if "/result" appears elsewhere in the URI (or if the 
path doesn’t end with `/result`). Consider constructing the new URI by parsing 
the path segments and replacing only the final resource-type segment (or adding 
a dedicated `VFSURIFactory.createStateURI` helper).
   



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+
+      try {
+        val state_document =
+          DocumentFactory
+            .openDocument(State.uriFromResultUri(uri))
+            ._1
+            .asInstanceOf[VirtualDocument[Tuple]]
+        val stateReadIterator = state_document.get()
+
+        while (stateReadIterator.hasNext) {
+          val state = State.fromTuple(stateReadIterator.next())
+          inputMessageQueue.put(
+            FIFOMessageElement(WorkflowFIFOMessage(channelId, 
getSequenceNumber, StateFrame(state)))
+          )
+        }
+      } catch {
+        case _: Exception =>
+      }
+

Review Comment:
   The inner `try/catch` swallows all exceptions when reading state 
materialization. This can silently drop state on real failures (schema 
mismatch, permissions, IO errors). Catch only the expected 'missing state 
table' case and log/propagate unexpected exceptions so materialization bugs are 
diagnosable.



##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) -> 
typing.Iterator[DataFrame
             if receiver == self.worker_actor_id:
                 yield self.tuples_to_data_frame(tuples)
 
+    def emit_state_with_filter(self, state: State) -> 
typing.Iterator[StateFrame]:
+        for receiver, payload in self.partitioner.flush_state(state):
+            if receiver == self.worker_actor_id:
+                yield (
+                    StateFrame(payload)
+                    if isinstance(payload, State)
+                    else self.tuples_to_data_frame(payload)
+                )

Review Comment:
   `emit_state_with_filter` is annotated as returning `Iterator[StateFrame]`, 
but it can also yield `DataFrame` (when the partitioner flushes pending tuple 
batches). This makes the type hint incorrect and can break static analysis. 
Update the return type to an iterator over `DataPayload` (or `Union[StateFrame, 
DataFrame]`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to