Yicong-Huang opened a new issue, #4682:
URL: https://github.com/apache/texera/issues/4682
### What happened?
`OutputPortResultWriterThread.run()` calls `bufferedItemWriter.close()`
(which performs the iceberg `commit()` for the port's output table). If
`close()` throws — for example because iceberg's built-in commit-retry budget
is exhausted under heavy concurrent appends — the exception propagates out of
`Thread.run()` and the writer thread dies silently:
```scala
override def run(): Unit = {
var internalStop = false
while (!internalStop) {
queue.take() match {
case Left(tuple) => bufferedItemWriter.putOne(tuple)
case Right(_) => internalStop = true
}
}
bufferedItemWriter.close() // ← throws → thread dies, no one notices
}
```
`OutputManager.closeOutputStorageWriterIfNeeded` calls
`writerThread.join()`, which returns normally regardless of whether the thread
exited cleanly or via exception. The DP thread then fires `portCompleted` to
the controller as if the storage was durably written.
Symptoms downstream:
- Reader-side operators (e.g., HashJoin probe reading the build-side iceberg
table) see incomplete or stale data.
- The workflow may never reach `COMPLETED`, or reaches it with empty results.
- The test / user observes a 1-minute `Await.result(completion,
Duration.fromMinutes(1))` timeout in `DataProcessingSpec.executeWorkflow` with
no clue why.
**Expected**: the writer-thread failure should propagate to the worker, then
to the controller via the existing `AllForOneStrategy` supervisor in
`Controller.scala`, then to the client as a `FatalError` — surfacing the
iceberg cause immediately instead of a silent 1-minute timeout.
### How to reproduce?
1. Trigger any workflow whose physical plan has multiple workers writing to
the same logical output port (HashJoin build/probe partitions, partitioned
aggregate, shuffle output, etc.).
2. Force iceberg commit to fail in `IcebergTableWriter.flushBuffer()` by
either:
- Running on slow CI runners where multiple workers race on the same
metadata.json (observed naturally on
`apache/texera/actions/runs/25259338567/job/74063902070` for
`csv->(csv->)->join workflow normally` in `DataProcessingSpec`), or
- Mocking the iceberg `Table.newAppend().appendFile().commit()` path to
throw `CommitFailedException` for more retries than the configured
`commit.retry.num-retries` (10).
3. Observe: instead of a `FatalError` with the iceberg cause, the test hangs
for 1 minute and reports `com.twitter.util.TimeoutException: 1.minutes`. The
workflow's regions log "successfully terminated" right before the silence
begins.
### Branch
main (also reproduced on `release/v1.1.0-incubating`)
### Commit Hash (Optional)
8f14cec1a52cdcc913303fc8d0a7520a52d2ff62 (release/v1.1.0-incubating backport
CI for #4632)
### Relevant log output
```
[2026-05-02 19:04:38,156] [WARN] [org.apache.iceberg.util.Tasks]
[Thread-964] -
Retrying task after failure: Cannot commit
operator-port-result.wid_1_eid_1_globalportid_(
logicalOpId=HashJoinOpDesc-acaa31f6-...,
layerName=build, portId=0, isInternal=true, isInput=false)_result:
metadata location ...metadata/00000-...metadata.json has changed from
...metadata/00001-...metadata.json
org.apache.iceberg.exceptions.CommitFailedException: ...
at
org.apache.texera.amber.engine.architecture.worker.managers.OutputPortResultWriterThread.run(OutputPortResultWriterThread.scala:47)
...
[2026-05-02 19:04:38,743] [INFO] [CONTROLLER] [RegionExecutionCoordinator] -
Region 0 successfully terminated.
... 60 seconds of silence ...
[info] - should execute csv->(csv->)->join workflow normally *** FAILED ***
[info] com.twitter.util.TimeoutException: 1.minutes
[info] at com.twitter.util.Promise.ready(Promise.scala:680)
[info] ...
[info] at
org.apache.texera.amber.engine.e2e.DataProcessingSpec.executeWorkflow(DataProcessingSpec.scala:151)
```
### Out of scope
- Adding our own retry layer on top of iceberg. Iceberg's built-in retry is
already at 10 retries / 100ms→10s backoff (see `storage.conf`), which is enough
headroom.
- Changing the multi-worker → single-table design. Multiple workers writing
the same iceberg table is by design (HashJoin build/probe partitions, etc.).
- The "Worker X is not reachable anymore, it might have crashed" warning
that `removeActorRef` logs for both real crashes and normal end-of-region
cleanup. That noise hides real crashes today and should be cleaned up
separately.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]