Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/15820#discussion_r88758556
--- Diff:
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
---
@@ -626,3 +776,106 @@ class KafkaSourceStressSuite extends KafkaSourceTest {
iterations = 50)
}
}
+
+class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with
SharedSQLContext {
+
+ import testImplicits._
+
+ private var testUtils: KafkaTestUtils = _
+
+ private val topicId = new AtomicInteger(0)
+
+ private def newTopic(): String =
s"failOnDataLoss-${topicId.getAndIncrement()}"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ testUtils = new KafkaTestUtils {
+ override def brokerConfiguration: Properties = {
+ val props = super.brokerConfiguration
+ // Try to make Kafka clean up messages as fast as possible.
However, there is a hard-code
+ // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so
this test should run at
+ // least 30 seconds.
+ props.put("log.cleaner.backoff.ms", "100")
+ props.put("log.segment.bytes", "40")
+ props.put("log.retention.bytes", "40")
+ props.put("log.retention.check.interval.ms", "100")
+ props.put("delete.retention.ms", "10")
+ props.put("log.flush.scheduler.interval.ms", "10")
+ props
+ }
+ }
+ testUtils.setup()
+ }
+
+ override def afterAll(): Unit = {
+ if (testUtils != null) {
+ testUtils.teardown()
+ testUtils = null
+ super.afterAll()
+ }
+ }
+
+ test("stress test for failOnDataLoss=false") {
+ val reader = spark
+ .readStream
+ .format("kafka")
+ .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+ .option("kafka.metadata.max.age.ms", "1")
+ .option("subscribePattern", "failOnDataLoss.*")
+ .option("startingOffsets", "earliest")
+ .option("failOnDataLoss", "false")
+ val kafka = reader.load()
+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+ .as[(String, String)]
+ val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new
ForeachWriter[Int] {
+
+ override def open(partitionId: Long, version: Long): Boolean = {
+ true
+ }
+
+ override def process(value: Int): Unit = {
+ // Slow down the processing speed so that messages may be aged out.
+ Thread.sleep(Random.nextInt(100))
+ }
+
+ override def close(errorOrNull: Throwable): Unit = {
+ }
+ }).start()
+
+ val testTime = 1.minutes
+ val startTime = System.currentTimeMillis()
+ val topics = mutable.ArrayBuffer[String]()
+ while (System.currentTimeMillis() - testTime.toMillis < startTime) {
+ Random.nextInt(6) match {
+ case 0 =>
+ val topic = newTopic()
+ topics += topic
+ testUtils.createTopic(topic, partitions = 1)
+ case 1 =>
--- End diff --
Update test to recreate same topics.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]