WweiL commented on code in PR #45221:
URL: https://github.com/apache/spark/pull/45221#discussion_r1503171103


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala:
##########
@@ -92,13 +93,18 @@ class KafkaContinuousStream(
 
     val deletedPartitions = 
oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
     if (deletedPartitions.nonEmpty) {
-      val message = if (
-        
offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
-        s"$deletedPartitions are gone. ${CUSTOM_GROUP_ID_ERROR_MESSAGE}"
+      if 
(offsetReader.driverKafkaParams.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) {
+        reportDataLoss(
+          s"$deletedPartitions are gone.${CUSTOM_GROUP_ID_ERROR_MESSAGE}",
+          () =>
+            
QueryExecutionErrors.partitionsDeletedAndGroupIdConfigPresentKafkaError(
+              deletedPartitions.toString,
+              ConsumerConfig.GROUP_ID_CONFIG))
       } else {
-        s"$deletedPartitions are gone. Some data may have been missed."
+        reportDataLoss(
+          s"$deletedPartitions are gone. Some data may have been missed.",
+          () => 
QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString))

Review Comment:
   It took me a bit to understand this logic and this appears more than once.
   
   Could we simplify the logic? I see that there are two functions with similar 
roles defined: `partitionsDeletedKafkaError` and 
`partitionsDeletedAndGroupIdConfigPresentKafkaError`. If we can make them into 
one function `partitionsDeletedKafkaError(partitions: String, groupIdConfig: 
Option[String])`,  and matching on groupIdConfig, we can set correct error 
classs and message parameters.
   
   Then here we can clean the logic here by doing something like:
   ```
   val message = if () ... else ()...
   reportDataLoss(message, 
QueryExecutionErrors.partitionsDeletedKafkaError(deletedPartitions.toString, 
Option(offsetReader.driverKafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)))



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to