HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042172068


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as 
an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) 
have been lost " +

Review Comment:
   E.g. The default implementation of SparkThrowable is tightly coupled with 
SparkThrowableHelper which error class reader is tied to the Spark project 
global one. If 3rd party data source developer decides to (and technically has 
to) go with different error class json file then the default implementation no 
longer works. Maybe there is a room for improvement, make utility class/object 
be extensible.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to