aglinxinyuan commented on code in PR #5712:
URL: https://github.com/apache/texera/pull/5712#discussion_r3410530987
##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/DataProcessingSpec.scala:
##########
@@ -101,54 +95,13 @@ class DataProcessingSpec
TestKit.shutdownActorSystem(system)
}
- def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]]
= {
- var results: Map[OperatorIdentity, List[Tuple]] = null
- val client = new AmberClient(
+ def executeWorkflow(workflow: Workflow): Map[OperatorIdentity, List[Tuple]] =
+ runWorkflowAndReadResults(
system,
- workflow.context,
- workflow.physicalPlan,
- ControllerConfig.default,
- error => {}
+ workflow,
+ workflow.logicalPlan.getTerminalOperatorIds,
+ _.get().toList
)
Review Comment:
Done in `ce29f12` — the run-and-read logic now lives in
`TestUtils.runWorkflowAndReadTerminalResults(system, workflow)` (defaults to
the workflow's terminal ops + `List[Tuple]`). `executeWorkflow` is now a
one-line delegate that just binds the spec's `system`, so the call sites stay
unchanged.
##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -78,6 +79,73 @@ object TestUtils {
)
}
+ /**
+ * Resolve and read each operator's external RESULT document at
`executionId`,
+ * applying `extract` to the opened document. Operators with no external
+ * RESULT uri (e.g. one whose output wasn't materialized) are omitted.
Shared
+ * by the e2e specs so the lookup-open-extract block doesn't drift between
+ * copies.
+ */
+ def readMaterializedResults[T](
+ executionId: ExecutionIdentity,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T
+ ): Map[OperatorIdentity, T] =
+ operatorIds.flatMap { opId =>
+ getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri
=>
+ opId -> extract(
+
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ )
+ }
+ }.toMap
+
+ /**
+ * Run `workflow` to COMPLETED, then read the requested operators'
materialized
+ * results via `readMaterializedResults`. A FatalError aborts the run and is
+ * surfaced as the exception from the completion await. Shared by the simple
+ * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);
Review Comment:
Good catch — `LoopIntegrationSpec` isn't in this PR. Dropped the specific
spec names from the docstring in `ce29f12`; it now just describes the behavior.
##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -78,6 +79,73 @@ object TestUtils {
)
}
+ /**
+ * Resolve and read each operator's external RESULT document at
`executionId`,
+ * applying `extract` to the opened document. Operators with no external
+ * RESULT uri (e.g. one whose output wasn't materialized) are omitted.
Shared
+ * by the e2e specs so the lookup-open-extract block doesn't drift between
+ * copies.
+ */
+ def readMaterializedResults[T](
+ executionId: ExecutionIdentity,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T
+ ): Map[OperatorIdentity, T] =
+ operatorIds.flatMap { opId =>
+ getResultUriByLogicalPortId(executionId, opId, PortIdentity()).map { uri
=>
+ opId -> extract(
+
DocumentFactory.openDocument(uri)._1.asInstanceOf[VirtualDocument[Tuple]]
+ )
+ }
+ }.toMap
+
+ /**
+ * Run `workflow` to COMPLETED, then read the requested operators'
materialized
+ * results via `readMaterializedResults`. A FatalError aborts the run and is
+ * surfaced as the exception from the completion await. Shared by the simple
+ * "run and read" e2e specs (e.g. DataProcessingSpec, LoopIntegrationSpec);
+ * specs that drive the run differently (e.g. reconfiguration's
pause/resume)
+ * call `readMaterializedResults` directly inside their own completion
callback.
+ */
+ def runWorkflowAndReadResults[T](
+ system: ActorSystem,
+ workflow: Workflow,
+ operatorIds: Iterable[OperatorIdentity],
+ extract: VirtualDocument[Tuple] => T,
+ completionTimeout: Duration = Duration.fromMinutes(1)
+ ): Map[OperatorIdentity, T] = {
+ // The Promise carries the result so completing the run and handing back
the
+ // value are atomic. Every terminal path uses `updateIfEmpty`, so a second
+ // event (a late FatalError after COMPLETED, or a repeated state update)
+ // can't throw inside a callback and get swallowed -- which would otherwise
+ // mask the real failure as a timeout. A read failure inside the COMPLETED
+ // callback fails the Promise (via `Try`) instead of hanging, and
+ // `shutdown()` runs in a `finally` so a timeout or error can't leak the
+ // client's actors.
+ val completion = Promise[Map[OperatorIdentity, T]]()
Review Comment:
The method returns the materialized `Map[OperatorIdentity, T]` (it `Await`s
internally) rather than exposing the `Promise` — the `Promise` here is purely
the internal completion control you noted is fine. I kept it returning the
resolved value instead of a `Future` because the callers (and the new
terminal-result convenience) want the blocking result; happy to switch to
`Future[Map[...]]` if you'd rather callers `Await` themselves.
##########
amber/src/test/scala/org/apache/texera/amber/engine/e2e/TestUtils.scala:
##########
@@ -188,29 +256,11 @@ object TestUtils {
var result: Map[OperatorIdentity, List[Tuple]] = null
client.registerCallback[ExecutionStateUpdate](evt => {
if (evt.state == COMPLETED) {
- result = workflow.logicalPlan.getTerminalOperatorIds
- .filter(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- )
- uri.nonEmpty
- })
- .map(terminalOpId => {
- val uri = getResultUriByLogicalPortId(
- workflow.context.executionId,
- terminalOpId,
- PortIdentity()
- ).get
- terminalOpId -> DocumentFactory
- .openDocument(uri)
- ._1
- .asInstanceOf[VirtualDocument[Tuple]]
- .get()
- .toList
- })
- .toMap
+ result = readMaterializedResults(
+ workflow.context.executionId,
+ workflow.logicalPlan.getTerminalOperatorIds,
+ _.get().toList
+ )
Review Comment:
Added in `ce29f12` — there is now a `readMaterializedResults(workflow)`
overload that defaults to the workflow's terminal ops + `List[Tuple]`, and
`shouldReconfigure` uses it. For the full run-and-read case there is
`runWorkflowAndReadTerminalResults(system, workflow)` (what `executeWorkflow`
now delegates to).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]