HeartSaVioR commented on a change in pull request #35238:
URL: https://github.com/apache/spark/pull/35238#discussion_r787402655
##########
File path:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
##########
@@ -195,6 +195,45 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
true
}
+ test("Trigger.AvailableNow") {
+ val topic = newTopic()
+ testUtils.createTopic(topic, partitions = 5)
+
+ testUtils.sendMessages(topic, (0 until 15).map{ case x =>
+ s"foo-$x"
+ }.toArray, Some(0))
+
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("maxOffsetsPerTrigger", 5)
+ .option("subscribe", topic)
+ .option("startingOffsets", "earliest")
+ .load()
+
+ var index: Int = 0
+ def startTriggerAvailableNowQuery(): StreamingQuery = {
+ reader.writeStream
+ .foreachBatch{ case (_: Dataset[Row], _: Long) =>
Review comment:
My apologize for back and forth. I roughly remember this worked, but
looks like it doesn't work. Please revert the latest change.
```
[error]
/home/runner/work/spark/spark/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:219:22:
missing parameter type for expanded function
[error] The argument types of an anonymous function must be fully known.
(SLS 8.5)
[error] Expected type was: ?
[error] .foreachBatch{ case (_: Dataset[Row], _: Long) =>
[error] ^
[error] one error found
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]