Xiao-zhen-Liu commented on code in PR #4490:
URL: https://github.com/apache/texera/pull/4490#discussion_r3192837833


##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala:
##########
@@ -57,6 +58,9 @@ object State {
 
   def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))
 
+  def uriFromResultUri(resultUri: URI): URI =

Review Comment:
   This helper is hacky. State and Result are sibling resources — both belong 
to the same `(workflowId, executionId, globalPortId)` and neither is derived 
from the other. Deriving one URI from another by string replacement also breaks 
if `/result` appears anywhere else in the URI (e.g. an operator named `result`).
   
   A cleaner approach to add a `VFSURIFactory.createStateURI(workflowId, 
executionId, globalPortId)` that mirrors `createResultURI`, build state URIs 
through it, and delete this helper.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -84,6 +89,19 @@ class InputPortMaterializationReaderThread(
     // Notify the input port of start of input channel
     emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
     try {
+      val stateDocument =
+        DocumentFactory
+          .openDocument(State.uriFromResultUri(uri))
+          ._1
+          .asInstanceOf[VirtualDocument[Tuple]]
+      val stateReadIterator = stateDocument.get()
+      while (stateReadIterator.hasNext) {
+        val state = State.fromTuple(stateReadIterator.next())
+        inputMessageQueue.put(

Review Comment:
   I think you are sending the state to each worker, which is our design 
decision. It would be good to note this in the code documentation, as the 
data-reading loop next filters by `partitioner.getBucketIndex`, while this loop 
intentionally enqueues every state to every worker. Without a note, the 
asymmetry reads like an oversight.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +124,7 @@ class InputPortMaterializationReaderThread(
       }
       // Flush any remaining tuples in the buffer.
       if (buffer.nonEmpty) flush()
+

Review Comment:
   Remove this unintentional new line.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -84,6 +89,19 @@ class InputPortMaterializationReaderThread(
     // Notify the input port of start of input channel
     emitECM(METHOD_START_CHANNEL, NO_ALIGNMENT)
     try {
+      val stateDocument =
+        DocumentFactory
+          .openDocument(State.uriFromResultUri(uri))
+          ._1
+          .asInstanceOf[VirtualDocument[Tuple]]
+      val stateReadIterator = stateDocument.get()
+      while (stateReadIterator.hasNext) {

Review Comment:
   Does it not matter the order we read the states vs. the tuples? If so, this 
is a design decision that should be documented here.



##########
amber/src/main/python/core/models/state.py:
##########
@@ -41,6 +41,10 @@ def from_json(cls, payload: str) -> "State":
     def from_tuple(cls, row: Tuple) -> "State":
         return cls.from_json(row[cls.CONTENT])
 
+    @staticmethod
+    def uri_from_result_uri(result_uri: str) -> str:

Review Comment:
   Ditto.



##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -17,8 +17,8 @@
 
 import typing
 from loguru import logger
-from pyarrow import Table
 from typing import Union
+from pyarrow import Table

Review Comment:
   Unrelated import reorder — please revert to keep the diff focused on the 
state-materialization change.



##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -138,8 +147,15 @@ def run(self) -> None:
                 self.uri
             )
             self.emit_ecm("StartChannel", 
EmbeddedControlMessageType.NO_ALIGNMENT)
-            storage_iterator = self.materialization.get()
 
+            state_document, _ = DocumentFactory.open_document(

Review Comment:
   Ditto about the design documentation.



##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) -> 
typing.Iterator[DataFrame
             if receiver == self.worker_actor_id:
                 yield self.tuples_to_data_frame(tuples)
 
+    def emit_state_with_filter(self, state: State) -> 
typing.Iterator[DataPayload]:

Review Comment:
   This Python implementation seems to be different from Scala imp. This routes 
through `partitioner.flush_state` only to immediately filter the broadcast back 
down to this worker — given the design that every worker reads every state, the 
partitioner detour is a no-op. Consider dropping this method and emitting 
`StateFrame(State.from_tuple(row))` directly in `run()`, mirroring the Scala 
reader. The current name (`_with_filter`) also implies routing logic that 
doesn't actually exist.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +239,10 @@ class OutputManager(
     })
   }
 
+  def saveStateToStorageIfNeeded(state: State): Unit = {

Review Comment:
   This writes the same state into every output port's state table. For 
multi-output-port operators that's a fan-out by N. Is this intended? If yes, 
please add a comment explaining the behavior so it's not read as a bug. Same 
goes for Python.



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -245,7 +256,7 @@ class OutputManager(
         writerThread.join()
       case None =>
     }
-
+    this.stateWriters.remove(outputPortId).foreach(_.close())

Review Comment:
   Result tuples are written via a dedicated `OutputPortResultWriterThread` so 
the DP thread isn't blocked on Iceberg I/O. State writes here run synchronously 
on the caller's thread (DataProcessor), so every `putOne` — and eventually 
buffer flushes / commits — stalls the DP thread. For loops that emit state 
frequently this becomes a real cost. Please mirror the result path with a 
`OutputPortStateWriterThread` (or generalize the existing one) so DP just 
enqueues.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to