HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042164208
########## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ########## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + + "during subsequent batches") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + + testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" + }.toArray, Some(0)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + + def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream + .foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] + }) + .trigger(Trigger.AvailableNow) + .start() + } + + val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { + assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { + query.stop() + } + } + TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: New exceptions don't define a new error class. I don't feel like we have constructed the best practice to apply error class framework to "Data sources", especially if it's 3rd party one. (Pretty sure Kafka is built-in, but this is also a reference implementation for 3rd party developers.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org