Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22207#discussion_r212706003 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDontFailOnDataLossSuite.scala --- @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable +import scala.util.Random + +import org.scalatest.time.SpanSugar._ + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +/** + * This is a basic test trait which will set up a Kafka cluster that keeps only several records in + * a topic and ages out records very quickly. This is a helper trait to test + * "failonDataLoss=false" case with missing offsets. + * + * Note: there is a hard-code 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) to clean up + * records. Hence each class extending this trait needs to wait at least 30 seconds (or even longer + * when running on a slow Jenkins machine) before records start to be removed. To make sure a test + * does see missing offsets, you can check the earliest offset in `eventually` and make sure it's + * not 0 rather than sleeping a hard-code duration. + */ +trait KafkaMissingOffsetsTest extends SharedSQLContext { + + protected var testUtils: KafkaTestUtils = _ + + override def createSparkSession(): TestSparkSession = { + // Set maxRetries to 3 to handle NPE from `poll` when deleting a topic + new TestSparkSession(new SparkContext("local[2,3]", "test-sql-context", sparkConf)) + } + + override def beforeAll(): Unit = { + super.beforeAll() + testUtils = new KafkaTestUtils { + override def brokerConfiguration: Properties = { + val props = super.brokerConfiguration + // Try to make Kafka clean up messages as fast as possible. However, there is a hard-code + // 30 seconds delay (kafka.log.LogManager.InitialTaskDelayMs) so this test should run at + // least 30 seconds. + props.put("log.cleaner.backoff.ms", "100") + // The size of RecordBatch V2 increases to support transactional write. + props.put("log.segment.bytes", "70") + props.put("log.retention.bytes", "40") + props.put("log.retention.check.interval.ms", "100") + props.put("delete.retention.ms", "10") + props.put("log.flush.scheduler.interval.ms", "10") + props + } + } + testUtils.setup() + } + + override def afterAll(): Unit = { + if (testUtils != null) { + testUtils.teardown() + testUtils = null + } + super.afterAll() + } +} + +class KafkaDontFailOnDataLossSuite extends KafkaMissingOffsetsTest { + + import testImplicits._ + + private val topicId = new AtomicInteger(0) + + private def newTopic(): String = s"failOnDataLoss-${topicId.getAndIncrement()}" + + /** + * @param testStreamingQuery whether to test a streaming query or a batch query. + * @param writeToTable the function to write the specified [[DataFrame]] to the given table. + */ + private def verifyMissingOffsetsDontCauseDuplicatedRecords( + testStreamingQuery: Boolean)(writeToTable: (DataFrame, String) => Unit): Unit = { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 1) + testUtils.sendMessages(topic, (0 until 50).map(_.toString).toArray) + + eventually(timeout(60.seconds)) { + assert( + testUtils.getEarliestOffsets(Set(topic)).head._2 > 0, + "Kafka didn't delete records after 1 minute") + } + + val table = "DontFailOnDataLoss" + withTable(table) { + val df = if (testStreamingQuery) { + spark.readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribe", topic) + .option("startingOffsets", s"""{"$topic":{"0":0}}""") + .option("failOnDataLoss", "false") + .option("kafkaConsumer.pollTimeoutMs", "1000") + .load() + } else { + spark.read + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribe", topic) + .option("startingOffsets", s"""{"$topic":{"0":0}}""") + .option("failOnDataLoss", "false") + .option("kafkaConsumer.pollTimeoutMs", "1000") + .load() + } + writeToTable(df.selectExpr("CAST(value AS STRING)"), table) + val result = spark.table(table).as[String].collect().toList + assert(result.distinct.size === result.size, s"$result contains duplicated records") + // Make sure Kafka did remove some records so that this test is valid. + assert(result.size > 0 && result.size < 50) + } + } + + test("failOnDataLoss=false should not return duplicated records: v1") { + withSQLConf( + "spark.sql.streaming.disabledV2MicroBatchReaders" -> + classOf[KafkaSourceProvider].getCanonicalName) { + verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) => + val query = df.writeStream.format("memory").queryName(table).start() + try { + query.processAllAvailable() + } finally { + query.stop() + } + } + } + } + + test("failOnDataLoss=false should not return duplicated records: v2") { + verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) => + val query = df.writeStream.format("memory").queryName(table).start() + try { + query.processAllAvailable() + } finally { + query.stop() + } + } + } + + test("failOnDataLoss=false should not return duplicated records: continuous processing") { + verifyMissingOffsetsDontCauseDuplicatedRecords(testStreamingQuery = true) { (df, table) => + val query = df.writeStream + .format("memory") + .queryName(table) + .trigger(Trigger.Continuous(100)) + .start() + try { + eventually(timeout(60.seconds)) { + assert(spark.table(table).as[String].collect().contains("49")) --- End diff -- doesnt processAllAvailable work in continuous processing?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org