Github user koeninger commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169537541
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
    +        s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    --- End diff --
    
    That's a "shouldn't happen unless the topicpartition or broker is gone" 
kind of thing.  Semantically I could see that being more like require than 
assert, but don't have a strong opinion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to