kunwp1 opened a new pull request, #5562:
URL: https://github.com/apache/texera/pull/5562
### What changes were proposed in this PR?
`ControllerRpcProbe` (in `RegionCoordinatorTestSupport`) captures
controller-to-worker RPCs in a mutable `ArrayBuffer`. That buffer is appended
to on the Pekko actor/scheduler thread via the output-gateway callback, while
the test thread iterates it through the read helpers (`methodTrace`,
`initializedWorkers`, `startedWorkers`, `endWorkerCalls`). Because the buffer
was unsynchronized, an append racing a `filter`/`map` was caught by Scala
2.13's `MutationTracker` and thrown as a hard
`java.util.ConcurrentModificationException`.
This surfaced as a non-deterministic failure of
`RegionExecutionCoordinatorSpec` — most visibly the *"retry EndWorker failures
and delay gracefulStop until a retry succeeds"* test, whose
`waitUntil(endWorkerCalls.size >= 2)` loop polls the buffer while the 200 ms
`EndWorker` retry appends to it from a scheduler thread.
The fix guards the buffer with a dedicated lock:
- the append (`calls += call`) synchronizes on the lock;
- the read helpers take an immutable snapshot (`calls.toSeq`) under the same
lock and filter/map the snapshot, so iteration can never race a concurrent
append;
- the lock is released before `fulfill()` runs, so no RPC callback executes
while the lock is held (no re-entrancy/deadlock risk).
No production code is changed — only the test-support probe and a new test.
`calls` is now `private`; it was only ever read through these helpers.
### Any related issues, documentation, discussions?
Resolves #5546
### How was this PR tested?
Added `ControllerRpcProbeSpec`, a focused concurrency stress test: a writer
thread drives 20,000 appends through `outputGateway.sendTo` (the same boundary
the coordinator uses) while a reader thread iterates every helper concurrently,
asserting no exception is thrown.
- **Before the fix**, the stress test fails deterministically (~38 ms) with
the exact issue error: `java.util.ConcurrentModificationException: mutation
occurred during iteration`.
- **After the fix**:
- `ControllerRpcProbeSpec` + `RegionExecutionCoordinatorSpec` +
`WorkflowExecutionCoordinatorSpec` → **11/11 pass**;
- the stress test passed **5/5** consecutive runs.
Reproduce (the `amber` module is the `WorkflowExecutionService` sbt project):
```
sbt 'WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.ControllerRpcProbeSpec'
sbt 'WorkflowExecutionService/testOnly
org.apache.texera.amber.engine.architecture.scheduling.RegionExecutionCoordinatorSpec'
```
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Anthropic Claude Opus 4.8)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]