Copilot commented on code in PR #4490:
URL: https://github.com/apache/texera/pull/4490#discussion_r3185360832
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +234,22 @@ class OutputManager(
})
}
+ def saveStateToStorageIfNeeded(state: State): Unit = {
+ try {
+ storageUris.foreach { uri =>
+ val writer = DocumentFactory
+ .openDocument(State.uriFromResultUri(uri))
+ ._1
+ .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+ .asInstanceOf[BufferedItemWriter[Tuple]]
+ writer.putOne(state.toTuple)
+ writer.close()
+ }
Review Comment:
`saveStateToStorageIfNeeded` opens a document and closes a writer for every
state and every output storage URI. Because Iceberg commits happen on
`close()`, this can create many small files/snapshots and add significant
overhead. Consider reusing a long-lived buffered writer (similar to tuple
storage writer threads) and flushing on port completion instead of per state.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
}
// Flush any remaining tuples in the buffer.
if (buffer.nonEmpty) flush()
+
+ try {
+ val state_document =
+ DocumentFactory
+ .openDocument(State.uriFromResultUri(uri))
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+ val stateReadIterator = state_document.get()
+
+ while (stateReadIterator.hasNext) {
+ val state = State.fromTuple(stateReadIterator.next())
+ inputMessageQueue.put(
+ FIFOMessageElement(WorkflowFIFOMessage(channelId,
getSequenceNumber, StateFrame(state)))
+ )
+ }
Review Comment:
Materialized state is enqueued only after all tuples have been read/flushed,
which means downstream will observe state updates after the full data stream.
Operators like `IfOpExec` rely on state to affect how subsequent tuples are
routed/processed, so state should be emitted before (or otherwise in a
well-defined order relative to) the tuples read from materialization.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala:
##########
@@ -126,6 +126,7 @@ class DataProcessor(
val outputState = executor.processState(state, port)
if (outputState.isDefined) {
outputManager.emitState(outputState.get)
+ outputManager.saveStateToStorageIfNeeded(state)
Review Comment:
`saveStateToStorageIfNeeded` is called with the *input* `state`, but the
executor may transform/drop state via `processState`. This likely materializes
the wrong state (and can diverge from what was actually emitted). Pass the
emitted `outputState.get` (or otherwise clarify which state should be
persisted).
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala:
##########
@@ -232,6 +234,22 @@ class OutputManager(
})
}
+ def saveStateToStorageIfNeeded(state: State): Unit = {
+ try {
+ storageUris.foreach { uri =>
+ val writer = DocumentFactory
+ .openDocument(State.uriFromResultUri(uri))
+ ._1
+ .writer(VirtualIdentityUtils.getWorkerIndex(actorId).toString)
+ .asInstanceOf[BufferedItemWriter[Tuple]]
+ writer.putOne(state.toTuple)
+ writer.close()
+ }
+ } catch {
+ case _: Exception => ()
+ }
Review Comment:
`saveStateToStorageIfNeeded` wraps the entire loop in a broad `catch { case
_: Exception => () }`, which can silently lose state materialization on any
failure. Prefer handling the expected 'missing state storage' case explicitly
and logging unexpected exceptions (possibly with the affected URI) so state
loss is observable.
##########
amber/src/main/python/core/architecture/packaging/output_manager.py:
##########
@@ -171,6 +175,24 @@ def save_tuple_to_storage_if_needed(self, tuple_: Tuple,
port_id=None) -> None:
PortStorageWriterElement(data_tuple=tuple_)
)
+ def save_state_to_storage_if_needed(self, state: State, port_id=None) ->
None:
+ if port_id is None:
+ uris = self._storage_uris.values()
+ elif port_id in self._storage_uris:
+ uris = [self._storage_uris[port_id]]
+ else:
+ return
+
+ for uri in uris:
+ state_uri = State.uri_from_result_uri(uri)
+ try:
+ document = DocumentFactory.open_document(state_uri)[0]
+ except ValueError:
+ document = DocumentFactory.create_document(state_uri,
State.SCHEMA)
+ writer = document.writer(str(uuid.uuid4()))
+ writer.put_one(state.to_tuple())
+ writer.close()
+
Review Comment:
`save_state_to_storage_if_needed` performs synchronous Iceberg open/create +
write + close for each state (and uses a fresh UUID writer id each time). This
can create many tiny commits/files and block the caller thread. Consider
buffering state writes (e.g., a dedicated writer thread per port/URI like tuple
storage) and reusing a stable writer identifier (e.g., worker index) to reduce
overhead.
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala:
##########
@@ -57,6 +58,9 @@ object State {
def fromTuple(row: Tuple): State = fromJson(row.getField[String](Content))
+ def uriFromResultUri(resultUri: URI): URI =
+ new URI(resultUri.toString.replace("/result", "/state"))
+
Review Comment:
`uriFromResultUri` uses a raw string replace on the full URI, which can
produce incorrect results if "/result" appears elsewhere in the URI (or if the
path doesn’t end with `/result`). Consider constructing the new URI by parsing
the path segments and replacing only the final resource-type segment (or adding
a dedicated `VFSURIFactory.createStateURI` helper).
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala:
##########
@@ -106,6 +111,25 @@ class InputPortMaterializationReaderThread(
}
// Flush any remaining tuples in the buffer.
if (buffer.nonEmpty) flush()
+
+ try {
+ val state_document =
+ DocumentFactory
+ .openDocument(State.uriFromResultUri(uri))
+ ._1
+ .asInstanceOf[VirtualDocument[Tuple]]
+ val stateReadIterator = state_document.get()
+
+ while (stateReadIterator.hasNext) {
+ val state = State.fromTuple(stateReadIterator.next())
+ inputMessageQueue.put(
+ FIFOMessageElement(WorkflowFIFOMessage(channelId,
getSequenceNumber, StateFrame(state)))
+ )
+ }
+ } catch {
+ case _: Exception =>
+ }
+
Review Comment:
The inner `try/catch` swallows all exceptions when reading state
materialization. This can silently drop state on real failures (schema
mismatch, permissions, IO errors). Catch only the expected 'missing state
table' case and log/propagate unexpected exceptions so materialization bugs are
diagnosable.
##########
amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py:
##########
@@ -125,6 +125,15 @@ def tuple_to_batch_with_filter(self, tuple_: Tuple) ->
typing.Iterator[DataFrame
if receiver == self.worker_actor_id:
yield self.tuples_to_data_frame(tuples)
+ def emit_state_with_filter(self, state: State) ->
typing.Iterator[StateFrame]:
+ for receiver, payload in self.partitioner.flush_state(state):
+ if receiver == self.worker_actor_id:
+ yield (
+ StateFrame(payload)
+ if isinstance(payload, State)
+ else self.tuples_to_data_frame(payload)
+ )
Review Comment:
`emit_state_with_filter` is annotated as returning `Iterator[StateFrame]`,
but it can also yield `DataFrame` (when the partitioner flushes pending tuple
batches). This makes the type hint incorrect and can break static analysis.
Update the return type to an iterator over `DataPayload` (or `Union[StateFrame,
DataFrame]`).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]