HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042805980


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as 
an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) 
have been lost " +

Review Comment:
   Stand on the 3rd party developer and go with journey how they can use the 
classes to integrate their data source to the error class framework. I know 
there is a README file but I don't think it's friendly to 3rd party developer. 
   
   
https://github.com/apache/spark/tree/master/core/src/main/resources/error/README.md
   
   1. They have to define their own error-class.json as the file is not 
modificable.
   2. We guide them to leverage SparkThrowable but default implementation is 
tied to Spark's error class json so they will be surprised that it just doesn't 
work and they have to override every default implementation. No documentation 
for this.
   3. They may understand what error class intends to do, but if they have to 
go with uncategorized error then they have no idea how they can ensure picking 
up unused sequence number.
   4. I believe classifying internal vs user-facing error is one of key points 
for UX of error class framework, but there is no mention in the README. 
Actually someone would have no idea how error class framework will show the 
exception to the end users. If they expect the same, they will be surprised for 
how we handle internal errors separately.
   
   This error class framework guideline is not mentioned anywhere in the data 
source implementation doc(we don't have one actually)/code comment.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to