Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169663225
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
    @@ -83,13 +81,50 @@ class CachedKafkaConsumer[K, V] private(
             s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
           record = buffer.next()
           assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
    +        s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    +    assert(buffer.hasNext(),
    --- End diff --
    
     `assert` turns into a JVM assert (I think?) and as such would be turned 
off if assertions are disabled, which is how it ought to run in production. If 
it's something that _could_ happen at all I think it should be `require`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to