WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503124502
##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -2622,6 +2622,62 @@
],
"sqlState" : "42K0E"
},
+ "KAFKA_DATA_LOSS" : {
+ "message" : [
+ "Some data may have been lost because they are not available in Kafka
any more;",
+ "either the data was aged out by Kafka or the topic may have been
deleted before all the data in the",
+ "topic was processed.",
+ "If you don't want your streaming query to fail on such cases, set the
source option failOnDataLoss to false.",
+ "Reason:"
Review Comment:
I think this replaces `INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE` defined in
`KafkaSourceProvider`. Do we want to remove it, or is this like a first step
and there are other error classes depends on this one?
##########
connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala:
##########
@@ -904,7 +904,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase with
testUtils.sendMessages(topic2, Array("6"))
},
StartStream(),
- ExpectFailure[IllegalStateException](e => {
+ ExpectFailure[SparkException](e => {
Review Comment:
I'm hesitate changing the exception to be SparkException here for backward
compatibility.
Existing code might be expecting IllegalStateException and do handling. Can
we define a class that extends IllegalStateException instead of using
SparkException?
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
val deletedPartitions =
oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
if (deletedPartitions.nonEmpty) {
- val message = if (
-
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
- s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+ if
(offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ reportDataLoss(
+ s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+ () =>
+
QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+ deletedPartitions.toString,
+ ConsumerConfig.GROUP_ID_CONFIG))
} else {
- s"$deletedPartitions are gone. Some data may have been missed."
+ reportDataLoss(
+ s"$deletedPartitions are gone. Some data may have been missed.",
+ () =>
QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))
Review Comment:
It took me a bit to understand this logic and this appears more than once.
Could we simplify the logic? I see that there are two function with similar
roles defined: `partitionsDeletedKafkaError` and
`partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into
one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig:
Option[String])`, and matching on groupIdConfig, we can set correct error
classs and message parameters.
Then here we can clean the logic here by doing something like:
```
val message = if () ... else ()...
reportDataLoss(message,
QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString,
Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -302,12 +302,12 @@ private[kafka010] class KafkaMicroBatchStream(
}
/**
- * If `failOnDataLoss` is true, this method will throw an
`IllegalStateException`.
+ * If `failOnDataLoss` is true, this method will throw the exception.
* Otherwise, just log a warning.
*/
- private def reportDataLoss(message: String): Unit = {
+ private def reportDataLoss(message: String, getException: () => Throwable):
Unit = {
Review Comment:
I think the reason why you are deining `getException` to be `() =>
Throwable` is to avoid unneeded computation of `getException()` in the `else`
branch?
In that case I think this code would also work:
```
private def reportDataLoss(message: String, getException: => Throwable):
Unit = {
if (failOnDataLoss) {
throw getException
} else {
logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
}
```
Some examples in the code base
https://github.com/apache/spark/blob/b7763a7eae2b9609012cbb4ce981276c6471cc4e/common/utils/src/main/scala/org/apache/spark/internal/Logging.scala#L59-L61
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]