aglinxinyuan commented on code in PR #5707:
URL: https://github.com/apache/texera/pull/5707#discussion_r3410466854
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala:
##########
@@ -198,6 +198,12 @@ case class PhysicalOp(
// schema propagation function
propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas =>
schemas),
isOneToManyOp: Boolean = false,
+ // Whether to reuse this operator's existing output storage instead of
+ // recreating it when its region re-executes, so output accumulated by
+ // earlier runs (e.g. across loop iterations) survives. Named after the
+ // behavior the scheduler checks, not the operator that sets it, so any
+ // future operator needing the same treatment can reuse it.
+ reusesOutputStorageOnReExecution: Boolean = false,
Review Comment:
Great questions.
**(1) Counter-case for needing *fresh* storage on re-execution — yes.** The
loop back-edge (`JumpToOperatorRegion`) does not just re-run LoopEnd; it
rewinds the schedule to LoopStart's level and re-executes **every** region from
there — LoopStart, every loop-body operator, and LoopEnd. On that one shared
event:
- LoopStart (emits `table.iloc[i]`) and every loop-body operator must emit
only the current iteration's output, so their document must be **recreated
fresh** each iteration (the default).
- only LoopEnd accumulates, so it opts into **reuse/append**.
If we reused-if-exists unconditionally, LoopStart and every loop-body
operator would wrongly accumulate stale rows across iterations — the
`LoopIntegrationSpec` row counts (LoopStart per-iteration; LoopEnd = 3 single /
9 nested) depend on exactly this distinction. Recreate-by-default already gives
fresh-per-iteration for free; LoopEnd just opts out.
**(2) Moved it to the port.** Agreed — done in `45fc475`: the flag is now
`OutputPort.reusesOutputStorage` (proto, next to `blocking`/`mode`), set on
LoopEnd's output port; the scheduler reads it per output port. Removed from
`PhysicalOp`.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -60,10 +60,43 @@ import org.apache.texera.web.SessionState
import org.apache.texera.web.model.websocket.event.RegionStateEvent
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
+import java.net.URI
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.{Duration => ScalaDuration}
+object RegionExecutionCoordinator {
+
+ /**
+ * Decide whether to (re)create the output document at `uri`, then act.
+ *
+ * When `reuseExistingStorage` is set and the document already exists, the
+ * existing document is kept untouched -- this is how an operator whose
+ * region re-executes (e.g. LoopEnd, which accumulates output across loop
+ * iterations) avoids clobbering output an earlier run produced, since
+ * `createDocument` overrides any existing document. Otherwise the document
+ * is created.
+ *
+ * `documentExists` / `createDocument` are injected so the create-or-reuse
+ * decision can be unit-tested without an iceberg backend or a live region.
+ *
+ * @return true iff `createDocument` was invoked.
+ */
+ def provisionOutputDocument(
+ uri: URI,
+ reuseExistingStorage: Boolean,
+ documentExists: URI => Boolean,
+ createDocument: URI => Unit
+ ): Boolean = {
+ if (reuseExistingStorage && documentExists(uri)) {
+ false
+ } else {
+ createDocument(uri)
+ true
+ }
+ }
Review Comment:
Agreed — moved the create-or-reuse decision out of
`RegionExecutionCoordinator` into `DocumentFactory.createOrReuseDocument` (it
only composes `documentExists` + `createDocument`, both already on
`DocumentFactory`). The coordinator now just calls it. (`45fc475`)
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -576,8 +609,29 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
- DocumentFactory.createDocument(resultURI, schema)
- DocumentFactory.createDocument(stateURI, State.schema)
+ // Operators that reuse their output storage across region re-runs
+ // (e.g. LoopEnd, whose output accumulates across the iterations of its
+ // own loop) already have their result/state documents from a prior
+ // run; on re-execution `createDocument` (overrideIfExists=true) would
+ // clobber them, so reuse the existing document when it is already
+ // there. (The inner LoopEnd of a nested loop additionally drops its
+ // output once per outer iteration -- on the Python worker side in
+ // MainLoop._process_state_frame -- which is orthogonal to this
+ // region-provisioning reuse.)
+ // Decided per the operator that OWNS this port, not region-wide: a
+ // region mixing a reuse op (LoopEnd) with others must still recreate
+ // the others' documents on re-execution.
+ val reusesOutputStorage =
+
region.getOperator(outputPortId.opId).reusesOutputStorageOnReExecution
+ Seq((resultURI, schema), (stateURI, State.schema)).foreach {
+ case (uri, sch) =>
+ RegionExecutionCoordinator.provisionOutputDocument(
+ uri,
+ reusesOutputStorage,
+ DocumentFactory.documentExists,
+ u => DocumentFactory.createDocument(u, sch)
+ )
+ }
Review Comment:
The differentiation itself is needed — see the counter-case on the
`PhysicalOp` thread: LoopStart and every loop-body operator re-execute on the
same back-edge as LoopEnd but must produce **fresh** output each iteration, so
an unconditional append-if-exists would make them accumulate stale rows.
But you're right that the coordinator shouldn't hold that state — as of
`45fc475` it no longer does. The flag now lives on `OutputPort` (declarative,
per-port) and the create-or-reuse decision lives in `DocumentFactory`; the
coordinator just reads the port's flag and calls the storage helper.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]