Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/20096#discussion_r160552160
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
@@ -977,20 +971,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite
extends StreamTest with Shared
}
}
- test("stress test for failOnDataLoss=false") {
- val reader = spark
- .readStream
- .format("kafka")
- .option("kafka.bootstrap.servers", testUtils.brokerAddress)
- .option("kafka.metadata.max.age.ms", "1")
- .option("subscribePattern", "failOnDataLoss.*")
- .option("startingOffsets", "earliest")
- .option("failOnDataLoss", "false")
- .option("fetchOffset.retryIntervalMs", "3000")
- val kafka = reader.load()
- .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
- .as[(String, String)]
- val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new
ForeachWriter[Int] {
+ protected def startStream(ds: Dataset[Int]) = {
--- End diff --
i think this factoring is not needed. `startStream()` is not used anywhere
else other than in this test. So i dont see a point of refactoring it to define
it outside the test.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]