Yicong-Huang commented on code in PR #5712:
URL: https://github.com/apache/texera/pull/5712#discussion_r3409967309


##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -78,6 +79,73 @@ object TestUtils {
     )
   }
 
+  /**
+    * Resolve and read each operator's external RESULT document at 
`executionId`,
+    * applying `extract` to the opened document. Operators with no external
+    * RESULT uri (e.g. one whose output wasn't materialized) are omitted. 
Shared
+    * by the e2e specs so the lookup-open-extract block doesn't drift between
+    * copies.
+    */
+  def readMaterializedResults[T](
+      executionId: ExecutionIdentity,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T
+  ): Map[OperatorIdentity, T] =
+    operatorIds.flatMap { opId =>
+      getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri 
=>
+        opId -> extract(
+          
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+        )
+      }
+    }.toMap
+
+  /**
+    * Run `workflow` to COMPLETED, then read the requested operators' 
materialized
+    * results via `readMaterializedResults`. A FatalError aborts the run and is
+    * surfaced as the exception from the completion await. Shared by the simple
+    * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);

Review Comment:
   is it shared by `LoopIntegrationSpec`? maybe it's not committed in this PR?
   
   maybe it's easier just not to mention where this is used.



##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala:
##########
@@ -101,54 +95,13 @@ class DataProcessingSpec
     TestKit.shutdownActorSystem(system)
   }
 
-  def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] 
= {
-    var results: Map[OperatorIdentity, List[Tuple]] = null
-    val client = new AmberClient(
+  def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] =
+    runWorkflowAndReadResults(
       system,
-      workflow.context,
-      workflow.physicalPlan,
-      ControllerConfig.default,
-      error => {}
+      workflow,
+      workflow.logicalPlan.getTerminalOperatorIds,
+      _.get().toList
     )

Review Comment:
   move this to util as well?



##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -188,29 +256,11 @@ object TestUtils {
     var result: Map[OperatorIdentity, List[Tuple]] = null
     client.registerCallback[ExecutionStateUpdate](evt => {
       if (evt.state == COMPLETED) {
-        result = workflow.logicalPlan.getTerminalOperatorIds
-          .filter(terminalOpId => {
-            val uri = getResultUriByLogicalPortId(
-              workflow.context.executionId,
-              terminalOpId,
-              PortIdentity()
-            )
-            uri.nonEmpty
-          })
-          .map(terminalOpId => {
-            val uri = getResultUriByLogicalPortId(
-              workflow.context.executionId,
-              terminalOpId,
-              PortIdentity()
-            ).get
-            terminalOpId -> DocumentFactory
-              .openDocument(uri)
-              ._1
-              .asInstanceOf[VirtualDocument[Tuple]]
-              .get()
-              .toList
-          })
-          .toMap
+        result = readMaterializedResults(
+          workflow.context.executionId,
+          workflow.logicalPlan.getTerminalOperatorIds,
+          _.get().toList
+        )

Review Comment:
   maybe we can provide a similar api to `executeWorkflow(workflow: Workflow)` 
(something like `readMaterializedResults(executionId: ExecutionId)`, which 
takes those opIds as default values).  just to make caller's life easier. 



##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -78,6 +79,73 @@ object TestUtils {
     )
   }
 
+  /**
+    * Resolve and read each operator's external RESULT document at 
`executionId`,
+    * applying `extract` to the opened document. Operators with no external
+    * RESULT uri (e.g. one whose output wasn't materialized) are omitted. 
Shared
+    * by the e2e specs so the lookup-open-extract block doesn't drift between
+    * copies.
+    */
+  def readMaterializedResults[T](
+      executionId: ExecutionIdentity,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T
+  ): Map[OperatorIdentity, T] =
+    operatorIds.flatMap { opId =>
+      getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri 
=>
+        opId -> extract(
+          
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+        )
+      }
+    }.toMap
+
+  /**
+    * Run `workflow` to COMPLETED, then read the requested operators' 
materialized
+    * results via `readMaterializedResults`. A FatalError aborts the run and is
+    * surfaced as the exception from the completion await. Shared by the simple
+    * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);
+    * specs that drive the run differently (e.g. reconfiguration's 
pause/resume)
+    * call `readMaterializedResults` directly inside their own completion 
callback.
+    */
+  def runWorkflowAndReadResults[T](
+      system: ActorSystem,
+      workflow: Workflow,
+      operatorIds: Iterable[OperatorIdentity],
+      extract: VirtualDocument[Tuple] => T,
+      completionTimeout: Duration = Duration.fromMinutes(1)
+  ): Map[OperatorIdentity, T] = {
+    // The Promise carries the result so completing the run and handing back 
the
+    // value are atomic. Every terminal path uses `updateIfEmpty`, so a second
+    // event (a late FatalError after COMPLETED, or a repeated state update)
+    // can't throw inside a callback and get swallowed -- which would otherwise
+    // mask the real failure as a timeout. A read failure inside the COMPLETED
+    // callback fails the Promise (via `Try`) instead of hanging, and
+    // `shutdown()` runs in a `finally` so a timeout or error can't leak the
+    // client's actors.
+    val completion = Promise[Map[OperatorIdentity, T]]()

Review Comment:
   nit: Since amber generally exposes async events/results as Twitter Futures, 
it may be better for the test-facing API to also work with Futures rather than 
exposing Promises directly. Internally, using Promise to control completion in 
tests is still fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to