Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20096#discussion_r160007884
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -418,11 +418,16 @@ abstract class StreamExecution(
        * Blocks the current thread until processing for data from the given 
`source` has reached at
        * least the given `Offset`. This method is intended for use primarily 
when writing tests.
        */
    -  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
    +  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit 
= {
         assertAwaitThread()
         def notDone = {
           val localCommittedOffsets = committedOffsets
    -      !localCommittedOffsets.contains(source) || 
localCommittedOffsets(source) != newOffset
    +      if (sources.length <= sourceIndex) {
    +        false
    --- End diff --
    
    The race condition is present because `sources` is initialized to Seq.empty 
and then assigned to the actual sources. You can actually initialize `sources` 
to null, and then return `notDone = false` when `sources` is null. Any other 
mismatch should throw error. I dont like this current code which hides 
erroneous situations.
      


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to