HeartSaVioR commented on code in PR #38911: URL: https://github.com/apache/spark/pull/38911#discussion_r1042805980
########## connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala: ########## @@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { assert(index == 3) } + test("Query with Trigger.AvailableNow should throw error when topic partitions got unavailable " + + "during subsequent batches") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + + testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" + }.toArray, Some(0)) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 5) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + // the query should fail regardless of this option + .option("failOnDataLoss", "true") + .load() + + def startTriggerAvailableNowQuery(): StreamingQuery = { + reader.writeStream + .foreachBatch((_: Dataset[Row], batchId: Long) => { + testUtils.deleteTopic(topic) + // create partitions less than the kafka data source figured out as an end state + testUtils.createTopic(topic, partitions = 3) + // offset will keep the same + testUtils.sendMessages(topic, (0 until 15).map { case x => + s"foo-$x" + }.toArray, Some(0)) + null.asInstanceOf[Unit] + }) + .trigger(Trigger.AvailableNow) + .start() + } + + val exc = intercept[Exception] { + val query = startTriggerAvailableNowQuery() + try { + assert(query.awaitTermination(streamingTimeout.toMillis)) + } finally { + query.stop() + } + } + TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) have been lost " + Review Comment: Stand on the 3rd party developer and go with journey how they can use the classes to integrate their data source to the error class framework. I know there is a README file but I don't think it's friendly to 3rd party developer. https://github.com/apache/spark/tree/master/core/src/main/resources/error/README.md 1. They have to define their own error-class.json as the file is not modificable. 2. We guide them to leverage SparkThrowable but default implementation is tied to Spark's error class json so they will be surprised that it just doesn't work and they have to override every default implementation. No documentation for this. 3. They may understand what error class intends to do, but if they have to go with uncategorized error then they have no idea how they can ensure picking up unused sequence number. 4. I believe classifying internal vs user-facing error is one of key points for UX of error class framework, but there is no mention in the README. Actually someone would have no idea how error class framework will show the exception to the end users. If they expect the same, they will be surprised for how we handle internal errors separately. This error class framework guideline is not mentioned anywhere in the data source implementation doc(we don't have one actually)/code comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org