Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r160550392
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
@@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest {
}
}
- test("(de)serialization of initial offsets") {
+ test("KafkaSource with watermark") {
+ val now = System.currentTimeMillis()
val topic = newTopic()
- testUtils.createTopic(topic, partitions = 64)
+ testUtils.createTopic(newTopic(), partitions = 1)
+ testUtils.sendMessages(topic, Array(1).map(_.toString))
- val reader = spark
+ val kafka = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("startingOffsets", s"earliest")
.option("subscribe", topic)
+ .load()
- testStream(reader.load)(
- makeSureGetOffsetCalled,
- StopStream,
- StartStream(),
- StopStream)
+ val windowedAggregation = kafka
+ .withWatermark("timestamp", "10 seconds")
+ .groupBy(window($"timestamp", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"window".getField("start") as 'window, $"count")
+
+ val query = windowedAggregation
+ .writeStream
+ .format("memory")
+ .outputMode("complete")
+ .queryName("kafkaWatermark")
+ .start()
+ query.processAllAvailable()
+ val rows = spark.table("kafkaWatermark").collect()
+ assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+ val row = rows(0)
+ // We cannot check the exact window start time as it depands on the
time that messages were
+ // inserted by the producer. So here we just use a low bound to make
sure the internal
+ // conversion works.
+ assert(
+ row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+ s"Unexpected results: $row")
+ assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+ query.stop()
}
+}
- test("maxOffsetsPerTrigger") {
+class KafkaSourceSuiteBase extends KafkaSourceTest {
+
+ import testImplicits._
+
+ test("(de)serialization of initial offsets") {
--- End diff --
Is this needed in the common KafkaSourceSuiteBase?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]