HeartSaVioR commented on code in PR #38911:
URL: https://github.com/apache/spark/pull/38911#discussion_r1041860498


##########
connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala:
##########
@@ -316,6 +320,50 @@ private[kafka010] class KafkaMicroBatchStream(
     }
   }
 
+  private def assertEndOffsetForTriggerAvailableNow(
+      endPartitionOffsets: Map[TopicPartition, Long]): Unit = {
+    val tpsForPrefetched = allDataForTriggerAvailableNow.keySet
+    val tpsForEndOffset = endPartitionOffsets.keySet
+
+    if (tpsForPrefetched != tpsForEndOffset) {
+      throw KafkaExceptions.topicPartitionsInEndOffsetAreNotSameWithPrefetched(
+        tpsForPrefetched, tpsForEndOffset)
+    }
+
+    val endOffsetHasGreaterThanPrefetched = {
+      allDataForTriggerAvailableNow.keySet.exists { tp =>
+        val offsetFromPrefetched = allDataForTriggerAvailableNow(tp)
+        val offsetFromEndOffset = endPartitionOffsets(tp)
+        offsetFromEndOffset > offsetFromPrefetched
+      }
+    }
+    if (endOffsetHasGreaterThanPrefetched) {
+      throw 
KafkaExceptions.endOffsetHasGreaterOffsetForTopicPartitionThanPrefetched(
+        allDataForTriggerAvailableNow, endPartitionOffsets)
+    }
+
+    val latestOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(endPartitionOffsets))

Review Comment:
   If we don't fetch the latest offset with latest topic-partitions from Kafka 
again, what we are trying to guard against? If someone turns on failOnDataLoss 
and runs the query with Trigger.AvailableNow, it will just move on when 
specific topic partition is dropped, leading that it never reaches the end 
state (prepared offset).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to