Copilot commented on code in PR #5712:
URL: https://github.com/apache/texera/pull/5712#discussion_r3409127802


##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -78,6 +79,66 @@ object TestUtils {
     )
   }
 
+  /**
+    * Resolve and read each operator's external RESULT document at 
`executionId`,
+    * applying `extract` to the opened document. Operators with no external
+    * RESULT uri (e.g. one whose output wasn't materialized) are omitted. 
Shared
+    * by the e2e specs so the lookup-open-extract block doesn't drift between
+    * copies.
+    */
+  def readMaterializedResults[T](
+      executionId: ExecutionIdentity,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T
+  ): Map[OperatorIdentity, T] =
+    operatorIds.flatMap { opId =>
+      getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri 
=>
+        opId -> extract(
+          
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+        )
+      }
+    }.toMap
+
+  /**
+    * Run `workflow` to COMPLETED, then read the requested operators' 
materialized
+    * results via `readMaterializedResults`. A FatalError aborts the run and is
+    * surfaced as the exception from the completion await. Shared by the simple
+    * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);
+    * specs that drive the run differently (e.g. reconfiguration's 
pause/resume)
+    * call `readMaterializedResults` directly inside their own completion 
callback.
+    */
+  def runWorkflowAndReadResults[T](
+      system: ActorSystem,
+      workflow: Workflow,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T,
+      completionTimeout: Duration = Duration.fromMinutes(1)
+  ): Map[OperatorIdentity, T] = {
+    val client = new AmberClient(
+      system,
+      workflow.context,
+      workflow.physicalPlan,
+      ControllerConfig.default,
+      _ => {}
+    )
+    val completion = Promise[Unit]()
+    var results: Map[OperatorIdentity, T] = Map.empty
+    client.registerCallback[FatalError](evt => {
+      completion.setException(evt.e)
+      client.shutdown()
+    })
+    client.registerCallback[ExecutionStateUpdate](evt => {
+      if (evt.state == COMPLETED) {
+        results = readMaterializedResults(workflow.context.executionId, 
operatorIds, extract)
+        completion.setDone()
+      }
+    })
+    Await.result(client.controllerInterface.startWorkflow(EmptyRequest(), ()))
+    Await.result(completion, completionTimeout)
+    client.shutdown()
+    results
+  }

Review Comment:
   `runWorkflowAndReadResults` can leak the AmberClient/actors on timeout or on 
any exception thrown inside the callbacks (those exceptions are swallowed by 
AmberClient’s `errorHandler` since it’s currently `_ => {}`). Also, using 
`setDone` / `setException` risks throwing on repeated events and those 
exceptions are silently dropped, which can turn real failures into a later 
timeout.
   
   Consider making the Promise carry the actual results, wiring AmberClient’s 
`errorHandler` to fail the promise, using `updateIfEmpty` to avoid 
double-completion, and ensuring `client.shutdown()` runs in a `finally` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to