WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503171103
##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
val deletedPartitions =
oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
if (deletedPartitions.nonEmpty) {
- val message = if (
-
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
- s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+ if
(offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+ reportDataLoss(
+ s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+ () =>
+
QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+ deletedPartitions.toString,
+ ConsumerConfig.GROUP_ID_CONFIG))
} else {
- s"$deletedPartitions are gone. Some data may have been missed."
+ reportDataLoss(
+ s"$deletedPartitions are gone. Some data may have been missed.",
+ () =>
QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))
Review Comment:
It took me a bit to understand this logic and this appears more than once.
Could we simplify the logic? I see that there are two functions with similar
objectives defined: `partitionsDeletedKafkaError` and
`partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into
one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig:
Option[String])`, and `match` on groupIdConfig, we can set correct error
classs and message parameters.
Then we can clean the logic here by doing something like:
```
val message = if () ... else ()...
reportDataLoss(message,
QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString,
Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]