Yicong-Huang opened a new pull request, #4683:
URL: https://github.com/apache/texera/pull/4683

   ### What changes were proposed in this PR?
   
   Fix the silent-writer-failure path in worker output ports. Today, when 
`OutputPortResultWriterThread.run()` throws — for example because iceberg's 
commit-retry budget is exhausted — the exception escapes `Thread.run()` and the 
writer thread dies. `OutputManager.closeOutputStorageWriterIfNeeded` calls 
`writerThread.join()` and proceeds normally, the worker reports `portCompleted` 
to the controller, and downstream operators read incomplete results. The end 
result is a 1-minute `Await.result(completion, Duration.fromMinutes(1))` 
timeout in tests / users with no signal pointing at iceberg.
   
   This PR makes the writer-thread failure observable and fatal:
   
   1. `OutputPortResultWriterThread.run()` wraps its loop + `close()` in `try { 
... } catch { case NonFatal(e) => failure = Some(e) }`. The captured failure is 
exposed via `getFailure`.
   2. `OutputManager.closeOutputStorageWriterIfNeeded` re-throws the captured 
failure after `writerThread.join()`.
   3. The throw propagates up the DP thread. `DPThread.start()` already routes 
uncaught DP-thread exceptions to the worker actor's main thread via 
`MainThreadDelegateMessage((worker) => throw err)`. The worker actor's throw 
triggers the controller's existing `AllForOneStrategy` supervisor in 
`Controller.scala`, which calls `cp.asyncRPCClient.sendToClient(FatalError(e, 
failedWorker))`.
   
   Net effect: instead of a silent 1-minute timeout, the test / user sees a 
`FatalError` immediately with the iceberg cause attached.
   
   ```diff
    // OutputPortResultWriterThread.scala
   +@volatile private var failure: Option[Throwable] = None
   +def getFailure: Option[Throwable] = failure
   +
    override def run(): Unit = {
   -  var internalStop = false
   -  while (!internalStop) {
   -    queue.take() match {
   -      case Left(tuple) => bufferedItemWriter.putOne(tuple)
   -      case Right(_)    => internalStop = true
   +  try {
   +    var internalStop = false
   +    while (!internalStop) {
   +      queue.take() match {
   +        case Left(tuple) => bufferedItemWriter.putOne(tuple)
   +        case Right(_)    => internalStop = true
   +      }
        }
   +    bufferedItemWriter.close()
   +  } catch {
   +    case NonFatal(e) => failure = Some(e)
      }
   -  bufferedItemWriter.close()
    }
   
    // OutputManager.scala — closeOutputStorageWriterIfNeeded
          writerThread.queue.put(Right(PortStorageWriterTerminateSignal))
          writerThread.join()
   +      writerThread.getFailure.foreach(throw _)
   ```
   
   What this PR explicitly does **not** change:
   
   - The iceberg commit retry budget. It is already 10 retries with 100ms→10s 
backoff in `storage.conf`, which is enough headroom for the multi-worker 
concurrent-append pattern.
   - The "multiple workers writing the same iceberg table" design. That is 
intentional for HashJoin build/probe partitions and other partitioned operators.
   
   ### Any related issues, documentation, discussions?
   
   Closes #4682.
   
   ### How was this PR tested?
   
   - `sbt 'WorkflowExecutionService / Test / compile'` clean.
   - `sbt 'WorkflowExecutionService / Test / testOnly OutputManagerSpec 
DataProcessingSpec'` clean — happy path is unchanged when the writer thread 
does not fail.
   - The fatal path is exercised by the existing `Controller` 
`AllForOneStrategy` supervisor (already covered by other tests). I did not add 
a deterministic iceberg-failure unit test in this PR; a follow-up could add one 
by injecting a fault `BufferedItemWriter` to assert the `FatalError` is emitted.
   
   ### Was this PR authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (Opus 4.7, 1M context)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to