Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r160550486
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
@@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest {
}
}
- test("(de)serialization of initial offsets") {
+ test("KafkaSource with watermark") {
+ val now = System.currentTimeMillis()
val topic = newTopic()
- testUtils.createTopic(topic, partitions = 64)
+ testUtils.createTopic(newTopic(), partitions = 1)
+ testUtils.sendMessages(topic, Array(1).map(_.toString))
- val reader = spark
+ val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("startingOffsets", s"earliest")
.option("subscribe", topic)
+ .load()
- testStream(reader.load)(
- makeSureGetOffsetCalled,
- StopStream,
- StartStream(),
- StopStream)
+ val windowedAggregation = kafka
+ .withWatermark("timestamp", "10 seconds")
+ .groupBy(window($"timestamp", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start") as 'window, $"count")
+
+ val query = windowedAggregation
+ .writeStream
+ .format("memory")
+ .outputMode("complete")
+ .queryName("kafkaWatermark")
+ .start()
+ query.processAllAvailable()
+ val rows = spark.table("kafkaWatermark").collect()
+ assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+ val row = rows(0)
+ // We cannot check the exact window start time as it depands on the
time that messages were
+ // inserted by the producer. So here we just use a low bound to make
sure the internal
+ // conversion works.
+ assert(
+ row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+ s"Unexpected results: $row")
+ assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+ query.stop()
}
+}
- test("maxOffsetsPerTrigger") {
+class KafkaSourceSuiteBase extends KafkaSourceTest {
+
+ import testImplicits._
+
+ test("(de)serialization of initial offsets") {
val topic = newTopic()
- testUtils.createTopic(topic, partitions = 3)
- testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray,
Some(0))
- testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray,
Some(1))
- testUtils.sendMessages(topic, Array("1"), Some(2))
+ testUtils.createTopic(topic, partitions = 5)
val reader = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("maxOffsetsPerTrigger", 10)
.option("subscribe", topic)
- .option("startingOffsets", "earliest")
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv =>
kv._2.toInt)
-
- val clock = new StreamManualClock
-
- val waitUntilBatchProcessed = AssertOnQuery { q =>
- eventually(Timeout(streamingTimeout)) {
- if (!q.exception.isDefined) {
- assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
- }
- }
- if (q.exception.isDefined) {
- throw q.exception.get
- }
- true
- }
- testStream(mapped)(
- StartStream(ProcessingTime(100), clock),
- waitUntilBatchProcessed,
- // 1 from smallest, 1 from middle, 8 from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
- // smallest now empty, 1 more from middle, 9 more from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
- 11, 108, 109, 110, 111, 112, 113, 114, 115, 116
- ),
+ testStream(reader.load)(
+ makeSureGetOffsetCalled,
StopStream,
- StartStream(ProcessingTime(100), clock),
- waitUntilBatchProcessed,
- // smallest now empty, 1 more from middle, 9 more from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
- 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
- 12, 117, 118, 119, 120, 121, 122, 123, 124, 125
- ),
- AdvanceManualClock(100),
- waitUntilBatchProcessed,
- // smallest now empty, 1 more from middle, 9 more from biggest
- CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
- 11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
- 12, 117, 118, 119, 120, 121, 122, 123, 124, 125,
- 13, 126, 127, 128, 129, 130, 131, 132, 133, 134
- )
- )
+ StartStream(),
+ StopStream)
}
test("cannot stop Kafka stream") {
--- End diff --
is this needed in the KafkaSourcesuiteBase?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]