Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/15527#discussion_r85246059
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
@@ -133,6 +132,41 @@ class KafkaSourceSuite extends KafkaSourceTest {
private val topicId = new AtomicInteger(0)
+ test("maxOffsetsPerTrigger") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 3)
+ testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray,
Some(0))
+ testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray,
Some(1))
+ testUtils.sendMessages(topic, Array("1"), Some(2))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("maxOffsetsPerTrigger", 10)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv =>
kv._2.toInt)
+
+ val clock = new StreamManualClock
+ testStream(mapped)(
+ StartStream(ProcessingTime(100), clock),
+ AdvanceManualClock(100),
+ // 1 from smallest, 1 from middle, 8 from biggest
+ CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
--- End diff --
There is a race condition here. The batch may be still running. I figured
out the following codes to cover the recovery and fix the race condition
finally.
```Scala
test("maxOffsetsPerTrigger") {
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray,
Some(0))
testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray,
Some(1))
testUtils.sendMessages(topic, Array("1"), Some(2))
val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
.option("kafka.metadata.max.age.ms", "1")
.option("maxOffsetsPerTrigger", 10)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv =>
kv._2.toInt)
val clock = new StreamManualClock
val waitUntilBatchProcessed = AssertOnQuery { q =>
eventually(Timeout(streamingTimeout)) {
if (!q.exception.isDefined) {
assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}
testStream(mapped)(
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
// 1 from smallest, 1 from middle, 8 from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116
),
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125
),
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
13, 126, 127, 128, 129, 130, 131, 132, 133, 134
)
)
}
```
This test fails now because of an issue being fixed in #14553.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]