Yicong-Huang opened a new issue, #4682:
URL: https://github.com/apache/texera/issues/4682

   ### What happened?
   
   `OutputPortResultWriterThread.run()` calls `bufferedItemWriter.close()` 
(which performs the iceberg `commit()` for the port's output table). If 
`close()` throws — for example because iceberg's built-in commit-retry budget 
is exhausted under heavy concurrent appends — the exception propagates out of 
`Thread.run()` and the writer thread dies silently:
   
   ```scala
   override def run(): Unit = {
     var internalStop = false
     while (!internalStop) {
       queue.take() match {
         case Left(tuple) => bufferedItemWriter.putOne(tuple)
         case Right(_)    => internalStop = true
       }
     }
     bufferedItemWriter.close()    // ← throws → thread dies, no one notices
   }
   ```
   
   `OutputManager.closeOutputStorageWriterIfNeeded` calls 
`writerThread.join()`, which returns normally regardless of whether the thread 
exited cleanly or via exception. The DP thread then fires `portCompleted` to 
the controller as if the storage was durably written.
   
   Symptoms downstream:
   
   - Reader-side operators (e.g., HashJoin probe reading the build-side iceberg 
table) see incomplete or stale data.
   - The workflow may never reach `COMPLETED`, or reaches it with empty results.
   - The test / user observes a 1-minute `Await.result(completion, 
Duration.fromMinutes(1))` timeout in `DataProcessingSpec.executeWorkflow` with 
no clue why.
   
   **Expected**: the writer-thread failure should propagate to the worker, then 
to the controller via the existing `AllForOneStrategy` supervisor in 
`Controller.scala`, then to the client as a `FatalError` — surfacing the 
iceberg cause immediately instead of a silent 1-minute timeout.
   
   ### How to reproduce?
   
   1. Trigger any workflow whose physical plan has multiple workers writing to 
the same logical output port (HashJoin build/probe partitions, partitioned 
aggregate, shuffle output, etc.).
   2. Force iceberg commit to fail in `IcebergTableWriter.flushBuffer()` by 
either:
      - Running on slow CI runners where multiple workers race on the same 
metadata.json (observed naturally on 
`apache/texera/actions/runs/25259338567/job/74063902070` for 
`csv->(csv->)->join workflow normally` in `DataProcessingSpec`), or
      - Mocking the iceberg `Table.newAppend().appendFile().commit()` path to 
throw `CommitFailedException` for more retries than the configured 
`commit.retry.num-retries` (10).
   3. Observe: instead of a `FatalError` with the iceberg cause, the test hangs 
for 1 minute and reports `com.twitter.util.TimeoutException: 1.minutes`. The 
workflow's regions log "successfully terminated" right before the silence 
begins.
   
   ### Branch
   
   main (also reproduced on `release/v1.1.0-incubating`)
   
   ### Commit Hash (Optional)
   
   8f14cec1a52cdcc913303fc8d0a7520a52d2ff62 (release/v1.1.0-incubating backport 
CI for #4632)
   
   ### Relevant log output
   
   ```
   [2026-05-02 19:04:38,156] [WARN] [org.apache.iceberg.util.Tasks] 
[Thread-964] -
     Retrying task after failure: Cannot commit 
operator-port-result.wid_1_eid_1_globalportid_(
       logicalOpId=HashJoinOpDesc-acaa31f6-...,
       layerName=build, portId=0, isInternal=true, isInput=false)_result:
     metadata location ...metadata/00000-...metadata.json has changed from 
...metadata/00001-...metadata.json
   org.apache.iceberg.exceptions.CommitFailedException: ...
       at 
org.apache.texera.amber.engine.architecture.worker.managers.OutputPortResultWriterThread.run(OutputPortResultWriterThread.scala:47)
   ...
   [2026-05-02 19:04:38,743] [INFO] [CONTROLLER] [RegionExecutionCoordinator] - 
Region 0 successfully terminated.
   ... 60 seconds of silence ...
   [info] - should execute csv->(csv->)->join workflow normally *** FAILED ***
   [info]   com.twitter.util.TimeoutException: 1.minutes
   [info]   at com.twitter.util.Promise.ready(Promise.scala:680)
   [info]   ...
   [info]   at 
org.apache.texera.amber.engine.e2e.DataProcessingSpec.executeWorkflow(DataProcessingSpec.scala:151)
   ```
   
   ### Out of scope
   
   - Adding our own retry layer on top of iceberg. Iceberg's built-in retry is 
already at 10 retries / 100ms→10s backoff (see `storage.conf`), which is enough 
headroom.
   - Changing the multi-worker → single-table design. Multiple workers writing 
the same iceberg table is by design (HashJoin build/probe partitions, etc.).
   - The "Worker X is not reachable anymore, it might have crashed" warning 
that `removeActorRef` logs for both real crashes and normal end-of-region 
cleanup. That noise hides real crashes today and should be cleaned up 
separately.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to