Yicong-Huang commented on code in PR #5707:
URL: https://github.com/apache/texera/pull/5707#discussion_r3409946385
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -60,10 +60,43 @@ import org.apache.texera.web.SessionState
import org.apache.texera.web.model.websocket.event.RegionStateEvent
import
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
+import java.net.URI
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.concurrent.duration.{Duration => ScalaDuration}
+object RegionExecutionCoordinator {
+
+ /**
+ * Decide whether to (re)create the output document at `uri`, then act.
+ *
+ * When `reuseExistingStorage` is set and the document already exists, the
+ * existing document is kept untouched -- this is how an operator whose
+ * region re-executes (e.g. LoopEnd, which accumulates output across loop
+ * iterations) avoids clobbering output an earlier run produced, since
+ * `createDocument` overrides any existing document. Otherwise the document
+ * is created.
+ *
+ * `documentExists` / `createDocument` are injected so the create-or-reuse
+ * decision can be unit-tested without an iceberg backend or a live region.
+ *
+ * @return true iff `createDocument` was invoked.
+ */
+ def provisionOutputDocument(
+ uri: URI,
+ reuseExistingStorage: Boolean,
+ documentExists: URI => Boolean,
+ createDocument: URI => Unit
+ ): Boolean = {
+ if (reuseExistingStorage && documentExists(uri)) {
+ false
+ } else {
+ createDocument(uri)
+ true
+ }
+ }
Review Comment:
I find this logic a bit detached from "ExecutionCoordinator". this is about
storage, maybe a wrong place to put the method?
##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala:
##########
@@ -198,6 +198,12 @@ case class PhysicalOp(
// schema propagation function
propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas =>
schemas),
isOneToManyOp: Boolean = false,
+ // Whether to reuse this operator's existing output storage instead of
+ // recreating it when its region re-executes, so output accumulated by
+ // earlier runs (e.g. across loop iterations) survives. Named after the
+ // behavior the scheduler checks, not the operator that sets it, so any
+ // future operator needing the same treatment can reuse it.
+ reusesOutputStorageOnReExecution: Boolean = false,
Review Comment:
two questions:
1. why can't this simply be true for all ports? I assumed only the operators
that will be reexecuted would have this property right? then if they were to be
re-executed (e.g., by for loop), then I feel by default they should be using
the same storage if that's already there. Can you provide a counter case that
needs a new storage during re-execution?
2. If there are some ports needs new storages during re-execution, I think
this property should not be on the PhysicalOp, but on the port. storage
behavior should be port specific.
##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -576,8 +609,29 @@ class RegionExecutionCoordinator(
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
val schema =
schemaOptional.getOrElse(throw new IllegalStateException("Schema is
missing"))
- DocumentFactory.createDocument(resultURI, schema)
- DocumentFactory.createDocument(stateURI, State.schema)
+ // Operators that reuse their output storage across region re-runs
+ // (e.g. LoopEnd, whose output accumulates across the iterations of its
+ // own loop) already have their result/state documents from a prior
+ // run; on re-execution `createDocument` (overrideIfExists=true) would
+ // clobber them, so reuse the existing document when it is already
+ // there. (The inner LoopEnd of a nested loop additionally drops its
+ // output once per outer iteration -- on the Python worker side in
+ // MainLoop._process_state_frame -- which is orthogonal to this
+ // region-provisioning reuse.)
+ // Decided per the operator that OWNS this port, not region-wide: a
+ // region mixing a reuse op (LoopEnd) with others must still recreate
+ // the others' documents on re-execution.
+ val reusesOutputStorage =
+
region.getOperator(outputPortId.opId).reusesOutputStorageOnReExecution
+ Seq((resultURI, schema), (stateURI, State.schema)).foreach {
+ case (uri, sch) =>
+ RegionExecutionCoordinator.provisionOutputDocument(
+ uri,
+ reusesOutputStorage,
+ DocumentFactory.documentExists,
+ u => DocumentFactory.createDocument(u, sch)
+ )
+ }
Review Comment:
do we have to differentiate storage that to be reused or not? A simpler
design would be if the port already has a document/storage, simply append new
tuples to it, without creating a new document. If we go this way, there is no
need for executionCoordinator to maintain such as state (whether a output port
is suitable for reused storage)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]