bobbai00 opened a new pull request, #4615:
URL: https://github.com/apache/texera/pull/4615
### What changes were proposed in this PR?
This PR fixes a race in `SyncExecutionResource.allTargetsCompleted` that
causes the sync execution API (`POST /api/execution/{wid}/{cuid}/run`) to
terminate before a HashJoin's probe phase produces output, returning an empty
result.
**Root cause.** `HashJoinOpDesc.getPhysicalPlan` produces two PhysicalOps
(`build`, `probe`) sharing one logical id, separated by a blocking edge. The
scheduler places them in two regions and runs them sequentially.
`WorkflowExecution.getAllRegionExecutionsStats` aggregates per-logical-op state
by `groupBy(_._1.logicalOpId.id)` over only the *registered*
`RegionExecution`s. Between "build region completed" and "probe region
instantiated," only the build PhysicalOp is registered, so
`aggregateStates(Iterable(COMPLETED))` returns `COMPLETED`. The sync resource
then takes the `TargetResultsReady` branch, calls `killExecution`, and reads
the probe's still-empty Iceberg output. The same shape applies to any logical
operator whose physical plan contains multiple PhysicalOps separated by a
blocking edge (e.g., `Aggregate`). It does not surface in the regular
WebSocket-driven frontend execution because the frontend waits for full
workflow termination.
**Fix.** Strengthen `allTargetsCompleted` to require, in addition to
`operatorState == COMPLETED`, that every declared external input port of the
target is already present in `OperatorMetrics.operatorStatistics.inputMetrics`.
Port-1 metrics only appear after the probe actually consumes data, which closes
the race window. Internal ports (e.g., HashJoin's build→probe internal edge)
are filtered out on both sides of the comparison so the predicate matches what
`aggregateMetrics` already exposes. Source operators (zero declared inputs) and
single-input operators are unaffected; for empty-input edge cases,
`terminalStateObservable` continues to provide the fallback signal.
```scala
val targetExpectedExternalInputs: Map[String, Int] =
effectiveLogicalPlan.operators
.filter(op => request.targetOperatorIds.contains(op.operatorIdentifier.id))
.map(op =>
op.operatorIdentifier.id ->
op.operatorInfo.inputPorts.count(!_.id.internal)
)
.toMap
def allTargetsCompleted(stats: ExecutionStatsStore): Boolean = {
request.targetOperatorIds.nonEmpty && request.targetOperatorIds.forall {
opId =>
stats.operatorInfo.get(opId).exists { metrics =>
val externalInputPortsReporting =
metrics.operatorStatistics.inputMetrics.count(!_.portId.internal)
val expectedExternalInputs =
targetExpectedExternalInputs.getOrElse(opId, 0)
metrics.operatorState == COMPLETED &&
externalInputPortsReporting >= expectedExternalInputs
}
}
}
```
### Any related issues, documentation, discussions?
Fixes #4576
### How was this PR tested?
Manually reproduced and verified end-to-end against `ComputingUnitMaster` on
port 8085 with a 3-operator DAG (CSVFileScan movies + CSVFileScan ratings →
HashJoin on `movieId`) executed via `POST /api/execution/{wid}/{cuid}/run` with
`targetOperatorIds = [HashJoinId]`. Inputs: `movies.csv` (1000 rows) and
`ratings.csv` (10 311 rows) from a LakeFS-managed dataset.
| metric | before fix | after fix |
|---|---|---|
| `success` | true | true |
| workflow `state` | Completed | Completed |
| HashJoin `state` | Running | Completed |
| HashJoin `inputTuples` | 1 000 | 11 311 |
| `inputPortShapes` | port 0 only (1 000 rows) | port 0: 1 000, port 1: 10
311 |
| HashJoin `outputTuples` | 0 | 10 311 |
| `result` (sample) | `[]` | `[{movieId: 47, title: "Seven (a.k.a. Se7en)
(1995)", userId: 1, rating: "5.0", ...}, ...]` |
Steps to reproduce / verify:
```
# 1. Start the master
sbt "project WorkflowExecutionService" compile
java ... org.apache.texera.web.ComputingUnitMaster # listens on :8085
# 2. Get a JWT
curl -s -X POST http://localhost:8080/api/auth/login \
-H "Content-Type: application/json" \
-d '{"username":"<user>","password":"<pw>"}'
# 3. POST the request (CSV → CSV → HashJoin, target = HashJoin)
curl -s -X POST http://localhost:8085/api/execution/<wid>/<cuid>/run \
-H "Content-Type: application/json" \
-H "Authorization: Bearer <token>" \
--data @sync-exec-request.json
```
Existing tests pass (`sbt "project WorkflowExecutionService" compile`
succeeds). No new unit test was added because the failure is a timing race in
the controller's region-registration sequence relative to the sync resource's
observable; reproducing it deterministically in a unit test would require
either mocking `ExecutionStatsStore` to emit a build-only snapshot followed by
a build+probe snapshot, or driving the full controller actor system, both of
which are out of scope for this targeted fix. Manual reproduction is reliable
on every run because the race window is several hundred milliseconds wide and
`Observable.amb` consistently selects the (incorrect) target-completion signal
first prior to this fix.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Claude Opus 4.7)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]