Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r160007884
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -418,11 +418,16 @@ abstract class StreamExecution(
* Blocks the current thread until processing for data from the given
`source` has reached at
* least the given `Offset`. This method is intended for use primarily
when writing tests.
*/
- private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+ private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit
= {
assertAwaitThread()
def notDone = {
val localCommittedOffsets = committedOffsets
- !localCommittedOffsets.contains(source) ||
localCommittedOffsets(source) != newOffset
+ if (sources.length <= sourceIndex) {
+ false
--- End diff --
The race condition is present because `sources` is initialized to Seq.empty
and then assigned to the actual sources. You can actually initialize `sources`
to null, and then return `notDone = false` when `sources` is null. Any other
mismatch should throw error. I dont like this current code which hides
erroneous situations.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]