Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/20150#discussion_r161341002
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
@@ -318,6 +318,84 @@ class KafkaSourceSuite extends KafkaSourceTest {
)
}
+ test("union bug in failover") {
+ def getSpecificDF(range: Range.Inclusive):
org.apache.spark.sql.Dataset[Int] = {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 1)
+ testUtils.sendMessages(topic, range.map(_.toString).toArray, Some(0))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("maxOffsetsPerTrigger", 5)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+
+ reader.load()
+ .selectExpr("CAST(value AS STRING)")
+ .as[String]
+ .map(k => k.toInt)
+ }
+
+ val df1 = getSpecificDF(0 to 9)
+ val df2 = getSpecificDF(100 to 199)
+
+ val kafka = df1.union(df2)
+
+ val clock = new StreamManualClock
+
+ val waitUntilBatchProcessed = AssertOnQuery { q =>
+ eventually(Timeout(streamingTimeout)) {
+ if (!q.exception.isDefined) {
+ assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+ }
+ }
+ if (q.exception.isDefined) {
+ throw q.exception.get
+ }
+ true
+ }
+
+ testStream(kafka)(
+ StartStream(ProcessingTime(100), clock),
+ waitUntilBatchProcessed,
+ // 5 from smaller topic, 5 from bigger one
+ CheckAnswer(0, 1, 2, 3, 4, 100, 101, 102, 103, 104),
--- End diff --
You can clean these codes a bit using the following snippet:
```
testStream(kafka)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((0 to 4) ++ (100 to 104): _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// 5 from smaller topic, 5 from bigger one
CheckLastBatch((5 to 9) ++ (105 to 109): _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smaller topic empty, 5 from bigger one
CheckLastBatch(110 to 114: _*),
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(115 to 119: _*),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 5 from bigger one
CheckLastBatch(120 to 124: _*)
)
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]