bobbai00 opened a new issue, #5855:
URL: https://github.com/apache/texera/issues/5855

   ## Summary
   
   `SyncExecutionResource.executeWorkflowSync` waits before reading an 
execution's results from storage (today: two unconditional `Thread.sleep(500)` 
calls). PR #5714 proposes replacing those sleeps with a bounded readiness-poll 
that compares the in-memory output-stats count against the row count committed 
to result storage.
   
   **The core issue is not `sleep(500)` versus a poll versus any other waiting 
mechanism. It is that the wait itself is redundant.** The engine already 
guarantees that an operator's result storage is durably committed *before* that 
operator is observable as `COMPLETED`, and both code paths that read results in 
`SyncExecutionResource` are gated on `COMPLETED`. So by the time we read, the 
data is guaranteed to be present — no sleep, no poll, and no stats-vs-storage 
count comparison is needed.
   
   Swapping the sleep for a poll trades one heuristic ("wait a fixed time and 
hope") for another ("compare two numbers that are themselves racy and hope"). 
Neither addresses the fact that the synchronization we're approximating is 
something the engine already provides as a hard happens-before guarantee.
   
   ## Why the wait is redundant
   
   The result writer is committed synchronously on the worker's data-processing 
thread *before* the worker reports the port/operator complete:
   
   1. `OutputManager.finalizeOutput()` queues `FinalizePort(port, input=false)` 
for each output port **before** `FinalizeExecutor` 
(`amber/.../messaginglayer/OutputManager.scala:278-284`).
   2. Processing `FinalizePort(port, false)` calls 
`OutputManager.closeOutputStorageWriterIfNeeded(port)`, which sends a terminate 
signal and then **blocks on `writerThread.join()`** 
(`OutputManager.scala:257-264`). The writer thread's `finally` runs 
`IcebergTableWriter.close()` → `flushBuffer()` → 
`table.newAppend().appendFile(...).commit()` 
(`common/workflow-core/.../iceberg/IcebergTableWriter.scala:106-135,140-144`). 
After `join()` returns, every row is committed. `getFailure.foreach(throw _)` 
re-throws a failed commit, so a commit failure becomes a `FatalError` (→ 
`FAILED`/`KILLED`), never `COMPLETED`.
   3. **Only then** does the worker send `portCompleted` 
(`DataProcessor.scala:174-181`).
   4. `FinalizeExecutor` afterward transitions the worker to `COMPLETED` and 
sends `workerExecutionCompleted` (`DataProcessor.scala:159-173`).
   
   So: **commit happens-before the port/worker is reported `COMPLETED`.** Since 
the read side (`IcebergDocument.getCount`/`get()`) reloads fresh catalog 
metadata on each call, a read taken after observing `COMPLETED` always sees the 
full result — including across the decoupled compute-unit process boundary.
   
   Both termination paths in `SyncExecutionResource` that read results are 
already gated on `COMPLETED`:
   - `TerminalStateReached(COMPLETED)`
   - `TargetResultsReady` — fires on `allTargetsCompleted(stats)`, i.e. every 
target operator's state is `COMPLETED`.
   
   The old "RegionExecutionCoordinator caches upstream results asynchronously 
after operators complete" comment that motivated the sleep no longer 
corresponds to any code; the synchronous `writerThread.join()` barrier 
(introduced with state materialization across regions) closed that gap.
   
   ## Proposed resolution
   
   - **Remove the wait** from `SyncExecutionResource` (rather than replace it 
with a poll), and document the invariant it relies on.
   - **Lock the invariant with engine-side tests**, since removing the safety 
net turns a regression from "a slow response" into "a silent short read":
     - The commit + failure-propagation half is already covered by 
`OutputPortStorageWriterThreadSpec`.
     - Add an e2e spec that reads result storage the instant the workflow 
reports `COMPLETED` (no wait) and asserts the committed row count (`getCount`) 
equals the rows actually readable, across a multi-region DAG including an 
intermediate operator.
   
   ## References
   
   - PR #5714 — the sleep→poll approach this issue argues against as the 
framing.
   - Closes-target of #5714: #5713.
   - Key engine seam: `OutputManager.closeOutputStorageWriterIfNeeded` 
(`writerThread.join()`) → `DataProcessor.outputOneTuple` (`portCompleted` / 
`COMPLETED`).
   
   ---
   *Filed with assistance from Claude (Opus 4.8) in compliance with the ASF 
generative-AI tooling policy.*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to