HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1042892403


##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -234,6 +235,108 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     assert(index == 3)
   }
 
+  test("Query with Trigger.AvailableNow should throw error when topic 
partitions got unavailable " +
+    "during subsequent batches") {
+    val topic = newTopic()
+    testUtils.createTopic(topic, partitions = 5)
+
+    testUtils.sendMessages(topic, (0 until 15).map { case x =>
+      s"foo-$x"
+    }.toArray, Some(0))
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("maxOffsetsPerTrigger", 5)
+      .option("subscribe", topic)
+      .option("startingOffsets", "earliest")
+      // the query should fail regardless of this option
+      .option("failOnDataLoss", "true")
+      .load()
+
+    def startTriggerAvailableNowQuery(): StreamingQuery = {
+      reader.writeStream
+        .foreachBatch((_: Dataset[Row], batchId: Long) => {
+          testUtils.deleteTopic(topic)
+          // create partitions less than the kafka data source figured out as 
an end state
+          testUtils.createTopic(topic, partitions = 3)
+          // offset will keep the same
+          testUtils.sendMessages(topic, (0 until 15).map { case x =>
+            s"foo-$x"
+          }.toArray, Some(0))
+          null.asInstanceOf[Unit]
+        })
+        .trigger(Trigger.AvailableNow)
+        .start()
+    }
+
+    val exc = intercept[Exception] {
+      val query = startTriggerAvailableNowQuery()
+      try {
+        assert(query.awaitTermination(streamingTimeout.toMillis))
+      } finally {
+        query.stop()
+      }
+    }
+    TestUtils.assertExceptionMsg(exc, "Some of partitions in Kafka topic(s) 
have been lost " +

Review Comment:
   Anyway the necessity for improvement of error class framework should not be 
scoped to this PR. We could leverage Kafka data source to dogfood for 3rd party 
data source implementation, but it needs to be different effort.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to