HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042172068
########## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ########## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + + "during subsequent batches") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + + testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" + }.toArray, Some(0)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + + def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream + .foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] + }) + .trigger(Trigger.AvailableNow) + .start() + } + + val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { + assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { + query.stop() + } + } + TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: E.g. The default implementation of SparkThrowable is tightly coupled with SparkThrowableHelper which error class reader is tied to the Spark project global one. If 3rd party data source developer decides to (and technically has to) go with different error class json file then the default implementation no longer works. Maybe there is a room for improvement, make utility class/object be extensible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org