HyukjinKwon opened a new pull request, #56722:
URL: https://github.com/apache/spark/pull/56722
### What changes were proposed in this pull request?
Two changes:
1. **Deflake `PythonPipelineSuite."flow progress events have correct python
source code location"`.**
The test defined **two** flows writing to the same streaming table
`table1`: the implicit flow
created by `@dp.table def table1()` and the
`@dp.append_flow(target='table1') def standalone_flow1()`
append flow. Because the two flows have no data dependency on each other,
the pipeline scheduler
runs them **concurrently**, and since both write to the same file-based
streaming table they share a
single `FileStreamSink` `_spark_metadata` commit log. They then race on
the batch-0 commit in
`ManifestFileCommitProtocol.commitJob`, which throws
`IllegalStateException("Race while writing batch 0")`.
This made the test intermittently fail (e.g. it passes on most master
runs but failed in the run linked below).
The fix points `standalone_flow1` at its own dedicated streaming table
created with
`dp.create_streaming_table('st2')`, so each flow writes to a distinct
sink and there is no shared
`_spark_metadata`. The test's intent (verifying that an append flow's
Python source-code location is
propagated) is unchanged; the asserted line numbers are preserved (the
`target` is edited in place and
the `create_streaming_table` call is added below the asserted `def` lines
so nothing shifts).
Note: serializing the two flows would **not** be a correct alternative —
`FileStreamSink.addBatch`
skips a batch when `batchId <= latestBatchId`, so a serialized second
writer would silently drop its
batch rather than error. Separate sinks is the correct fix.
2. **Add diagnostics to the `ManifestFileCommitProtocol` batch-commit
race.** Previously `commitJob`
threw a bare `Race while writing batch N`. It now logs the sink
`_spark_metadata` path and the
batchId at ERROR, and the exception message names the likely cause
(multiple concurrent streaming
queries writing to the same output path). This way, if the same class of
race ever resurfaces in a
scheduled job or a real pipeline, it is diagnosable from the logs alone
without having to reproduce it.
### Why are the changes needed?
`PythonPipelineSuite` is a flaky test on CI (the race surfaces
intermittently in the
`streaming, sql-kafka-0-10, ..., connect, avro` module group). The goal is a
green, non-flaky build.
### Does this PR introduce _any_ user-facing change?
No. The only production change is a clearer error message / additional ERROR
log when the
(already-existing) batch-commit race is hit.
### How was this patch tested?
A trimmed workflow on a fork builds `sql/connect/server` and runs **only**
`PythonPipelineSuite`,
**repeated 10×** (a single green run does not prove a flaky race is gone).
All 10 runs were green
(106 tests, 0 failures each).
- ❌ Before (race observed) — apache/spark master push run, module group
incl. `connect`:
https://github.com/apache/spark/actions/runs/27961702441/job/82748952932
- ✅ After (10× green) — fork validation run, `PythonPipelineSuite` x10:
https://github.com/HyukjinKwon/spark/actions/runs/28074747108
This pull request and its description were written by Isaac.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]