HyukjinKwon commented on a change in pull request #23324: 
[SPARK-26267][SS]Retry when detecting incorrect offsets from Kafka
URL: https://github.com/apache/spark/pull/23324#discussion_r241968370
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ##########
 @@ -192,19 +199,76 @@ private[kafka010] class KafkaOffsetReader(
   /**
    * Fetch the latest offsets for the topic partitions that are indicated
    * in the [[ConsumerStrategy]].
+   *
+   * Kafka may return earliest offsets when we are requesting latest offsets 
(KAFKA-7703). To avoid
+   * hitting this issue, we will use the given `knownOffsets` to audit the 
latest offsets returned
+   * by Kafka, if we find some incorrect offsets (a latest offset is less than 
an offset in
+   * `knownOffsets`), we will retry at most `maxOffsetFetchAttempts` times. If 
`knownOffsets` is not
+   * provided, we simply fetch the latest offsets twice and use the second 
result which is more
+   * likely correct.
+   *
+   * When a topic is recreated, the latest offsets may be less than offsets in 
`knownOffsets`. We
+   * cannot distinguish this with KAFKA-7703, so we just return whatever we 
get from Kafka after
+   * retrying.
    */
-  def fetchLatestOffsets(): Map[TopicPartition, Long] = runUninterruptibly {
+  def fetchLatestOffsets(
+      knownOffsets: Option[PartitionOffsetMap]): PartitionOffsetMap = 
runUninterruptibly {
     withRetriesWithoutInterrupt {
       // Poll to get the latest assigned partitions
       consumer.poll(0)
       val partitions = consumer.assignment()
       consumer.pause(partitions)
       logDebug(s"Partitions assigned to consumer: $partitions. Seeking to the 
end.")
 
-      consumer.seekToEnd(partitions)
-      val partitionOffsets = partitions.asScala.map(p => p -> 
consumer.position(p)).toMap
-      logDebug(s"Got latest offsets for partition : $partitionOffsets")
-      partitionOffsets
+      if (knownOffsets.isEmpty) {
+        // Fetch the latest offsets twice and use the second result which is 
more likely correct.
+        consumer.seekToEnd(partitions)
+        partitions.asScala.map(p => p -> consumer.position(p)).toMap
+        consumer.seekToEnd(partitions)
+        partitions.asScala.map(p => p -> consumer.position(p)).toMap
+      } else {
+        var partitionOffsets: PartitionOffsetMap = Map.empty
+
+        /**
+         * Compare `knownOffsets` and `partitionOffsets`. Returns all 
partitions that have incorrect
+         * latest offset (offset in `knownOffsets` is great than the one in 
`partitionOffsets`).
+         */
+        def findIncorrectOffsets: Seq[(TopicPartition, Long, Long)] = {
+          var incorrectOffsets = ArrayBuffer[(TopicPartition, Long, Long)]()
+          partitionOffsets.foreach { case (tp, offset) =>
+            knownOffsets.foreach(_.get(tp).foreach { knownOffset =>
+              if (knownOffset > offset) {
+                val incorrectOffset = (tp, knownOffset, offset)
+                incorrectOffsets += incorrectOffset
+              }
+            })
+          }
+          incorrectOffsets
+        }
+
+        // Retry to fetch latest offsets when detecting incorrect offsets. We 
don't use
+        // `withRetriesWithoutInterrupt` to retry because:
+        //
+        // - `withRetriesWithoutInterrupt` will reset the consumer for each 
attempt but a fresh
+        //    consumer has a much bigger chance to hit KAFKA-7703.
+        // - Avoid calling `consumer.poll(0)` which may cause KAFKA-7703.
+        var incorrectOffsets: Seq[(TopicPartition, Long, Long)] = Nil
+        var attempt = 0
+        do {
+          consumer.seekToEnd(partitions)
+          partitionOffsets = partitions.asScala.map(p => p -> 
consumer.position(p)).toMap
+          attempt += 1
+
+          incorrectOffsets = findIncorrectOffsets
+          if (incorrectOffsets.nonEmpty && attempt < maxOffsetFetchAttempts) {
+            logWarning("Retrying to fetch latest offsets because of incorrect 
offsets " +
+              "(partition, previous offset, fetched offset): " + 
incorrectOffsets)
 
 Review comment:
   Not a big deal at all but I would use string interpolation here. Please 
ignore this comment if other changes are ready.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to