Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r159797164
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -418,11 +418,16 @@ abstract class StreamExecution(
* Blocks the current thread until processing for data from the given
`source` has reached at
* least the given `Offset`. This method is intended for use primarily
when writing tests.
*/
- private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+ private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit
= {
assertAwaitThread()
def notDone = {
val localCommittedOffsets = committedOffsets
- !localCommittedOffsets.contains(source) ||
localCommittedOffsets(source) != newOffset
+ if (sources.length <= sourceIndex) {
+ false
--- End diff --
Sources is a var which might not be populated yet. (This race condition
showed up in AddKafkaData in my tests.)
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]