Yicong-Huang commented on code in PR #5707:
URL: https://github.com/apache/texera/pull/5707#discussion_r3409946385


##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -60,10 +60,43 @@ import org.apache.texera.web.SessionState
 import org.apache.texera.web.model.websocket.event.RegionStateEvent
 import 
org.apache.texera.web.resource.dashboard.user.workflow.WorkflowExecutionsResource
 
+import java.net.URI
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicReference
 import scala.concurrent.duration.{Duration => ScalaDuration}
 
+object RegionExecutionCoordinator {
+
+  /**
+    * Decide whether to (re)create the output document at `uri`, then act.
+    *
+    * When `reuseExistingStorage` is set and the document already exists, the
+    * existing document is kept untouched -- this is how an operator whose
+    * region re-executes (e.g. LoopEnd, which accumulates output across loop
+    * iterations) avoids clobbering output an earlier run produced, since
+    * `createDocument` overrides any existing document. Otherwise the document
+    * is created.
+    *
+    * `documentExists` / `createDocument` are injected so the create-or-reuse
+    * decision can be unit-tested without an iceberg backend or a live region.
+    *
+    * @return true iff `createDocument` was invoked.
+    */
+  def provisionOutputDocument(
+      uri: URI,
+      reuseExistingStorage: Boolean,
+      documentExists: URI => Boolean,
+      createDocument: URI => Unit
+  ): Boolean = {
+    if (reuseExistingStorage && documentExists(uri)) {
+      false
+    } else {
+      createDocument(uri)
+      true
+    }
+  }

Review Comment:
   I find this logic a bit detached from "ExecutionCoordinator". this is about 
storage, maybe a wrong place to put the method?



##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/workflow/PhysicalOp.scala:
##########
@@ -198,6 +198,12 @@ case class PhysicalOp(
     // schema propagation function
     propagateSchema: SchemaPropagationFunc = SchemaPropagationFunc(schemas => 
schemas),
     isOneToManyOp: Boolean = false,
+    // Whether to reuse this operator's existing output storage instead of
+    // recreating it when its region re-executes, so output accumulated by
+    // earlier runs (e.g. across loop iterations) survives. Named after the
+    // behavior the scheduler checks, not the operator that sets it, so any
+    // future operator needing the same treatment can reuse it.
+    reusesOutputStorageOnReExecution: Boolean = false,

Review Comment:
   two questions:
   1. why can't this simply be true for all ports? I assumed only the operators 
that will be reexecuted would have this property right? then if they were to be 
 re-executed (e.g., by for loop), then I feel by default they should be using 
the same storage if that's already there. Can you provide a counter case that 
needs a new storage during re-execution? 
   2. If there are some ports needs new storages during re-execution, I think 
this property should not be on the PhysicalOp, but on the port. storage 
behavior should be port specific. 



##########
amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala:
##########
@@ -576,8 +609,29 @@ class RegionExecutionCoordinator(
           
region.getOperator(outputPortId.opId).outputPorts(outputPortId.portId)._3
         val schema =
           schemaOptional.getOrElse(throw new IllegalStateException("Schema is 
missing"))
-        DocumentFactory.createDocument(resultURI, schema)
-        DocumentFactory.createDocument(stateURI, State.schema)
+        // Operators that reuse their output storage across region re-runs
+        // (e.g. LoopEnd, whose output accumulates across the iterations of its
+        // own loop) already have their result/state documents from a prior
+        // run; on re-execution `createDocument` (overrideIfExists=true) would
+        // clobber them, so reuse the existing document when it is already
+        // there. (The inner LoopEnd of a nested loop additionally drops its
+        // output once per outer iteration -- on the Python worker side in
+        // MainLoop._process_state_frame -- which is orthogonal to this
+        // region-provisioning reuse.)
+        // Decided per the operator that OWNS this port, not region-wide: a
+        // region mixing a reuse op (LoopEnd) with others must still recreate
+        // the others' documents on re-execution.
+        val reusesOutputStorage =
+          
region.getOperator(outputPortId.opId).reusesOutputStorageOnReExecution
+        Seq((resultURI, schema), (stateURI, State.schema)).foreach {
+          case (uri, sch) =>
+            RegionExecutionCoordinator.provisionOutputDocument(
+              uri,
+              reusesOutputStorage,
+              DocumentFactory.documentExists,
+              u => DocumentFactory.createDocument(u, sch)
+            )
+        }

Review Comment:
   do we have to differentiate storage that to be reused or not?  A simpler 
design would be if the port already has a document/storage, simply append new 
tuples to it, without creating a new document. If we go this way, there is no 
need for executionCoordinator to maintain such as state (whether a output port 
is suitable for reused storage)?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to