micheal-o commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503219354
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream(
}
/**
- * If `failOnDataLoss` is true, this method will throw an
`IllegalStateException`.
+ * If `failOnDataLoss` is true, this method will throw the exception.
* Otherwise, just log a warning.
*/
- private def reportDataLoss(message: String): Unit = {
+ private def reportDataLoss(message: String, getException: () => Throwable):
Unit = {
Review Comment:
This is scala's `call by name`. I was going to do this, but saw that our
style guide asks us to avoid using it.
> Avoid using call by name. Use () => T explicitly.
https://github.com/databricks/scala-style-guide?tab=readme-ov-file#call-by-name
##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -2622,6 +2622,62 @@
],
"sqlState" : "42K0E"
},
+ "KAFKA_DATA_LOSS" : {
+ "message" : [
+ "Some data may have been lost because they are not available in Kafka
any more;",
+ "either the data was aged out by Kafka or the topic may have been
deleted before all the data in the",
+ "topic was processed.",
+ "If you don't want your streaming query to fail on such cases, set the
source option failOnDataLoss to false.",
+ "Reason:"
Review Comment:
Yeah, I copied the message from here and was going to remove it. Forgot.
Will remove.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]