Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15820#discussion_r88963845
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
    @@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging
     private[kafka010] case class CachedKafkaConsumer private(
         topicPartition: TopicPartition,
         kafkaParams: ju.Map[String, Object]) extends Logging {
    +  import CachedKafkaConsumer._
     
       private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
     
    -  private val consumer = {
    +  private var consumer = createConsumer
    +
    +  /** Iterator to the already fetch data */
    +  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
    +  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
    +
    +  /** Create a KafkaConsumer to fetch records for `topicPartition` */
    +  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
         val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
         val tps = new ju.ArrayList[TopicPartition]()
         tps.add(topicPartition)
         c.assign(tps)
         c
       }
     
    -  /** Iterator to the already fetch data */
    -  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
    -  private var nextOffsetInFetchedData = -2L
    -
       /**
    -   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
    -   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
    +   * Get the record for the given offset if available. Otherwise it will 
either throw error
    +   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset).
    +   *
    +   * @param offset the offset to fetch.
    +   * @param untilOffset the max offset to fetch. Exclusive.
    +   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
    +   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
    +   *                       offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
    +   *                       this method will either return record at offset 
if available, or return
    +   *                       the next earliest available record less than 
untilOffset, or null. It
    +   *                       will not throw any exception.
        */
    -  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
    +  def get(
    +      offset: Long,
    +      untilOffset: Long,
    +      pollTimeoutMs: Long,
    +      failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
    +    require(offset < untilOffset,
    +      s"offset must always be less than untilOffset [offset: $offset, 
untilOffset: $untilOffset]")
         logDebug(s"Get $groupId $topicPartition nextOffset 
$nextOffsetInFetchedData requested $offset")
    -    if (offset != nextOffsetInFetchedData) {
    -      logInfo(s"Initial fetch for $topicPartition $offset")
    -      seek(offset)
    -      poll(pollTimeoutMs)
    +    // The following loop is basically for `failOnDataLoss = false`. When 
`failOnDataLoss` is
    +    // `false`, firstly, we will try to fetch the record at `offset`. If 
no such record, then we
    +    // will move to the next available offset within `[offset, 
untilOffset)` and retry.
    +    // If `failOnDataLoss` is `true`, the loop body will be executed only 
once.
    +    var toFetchOffset = offset
    +    while (toFetchOffset != UNKNOWN_OFFSET) {
    +      try {
    +        return fetchData(toFetchOffset, pollTimeoutMs)
    +      } catch {
    +        case e: OffsetOutOfRangeException =>
    +          // When there is some error thrown, it's better to use a new 
consumer to drop all cached
    +          // states in the old consumer. We don't need to worry about the 
performance because this
    +          // is not a normal path.
    +          resetConsumer()
    +          reportDataLoss(failOnDataLoss, s"Cannot fetch offset 
$toFetchOffset", e)
    +          toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, 
untilOffset)
    +      }
         }
    +    resetFetchedData()
    +    null
    +  }
     
    -    if (!fetchedData.hasNext()) { poll(pollTimeoutMs) }
    -    assert(fetchedData.hasNext(),
    -      s"Failed to get records for $groupId $topicPartition $offset " +
    -        s"after polling for $pollTimeoutMs")
    -    var record = fetchedData.next()
    +  /**
    +   * Return the next earliest available offset in [offset, untilOffset). 
If all offsets in
    +   * [offset, untilOffset) are invalid (e.g., the topic is deleted and 
recreated), it will return
    +   * `UNKNOWN_OFFSET`.
    +   */
    +  private def getEarliestAvailableOffsetBetween(offset: Long, untilOffset: 
Long): Long = {
    +    val (earliestOffset, latestOffset) = getAvailableOffsetRange()
    +    logWarning(s"Some data may be lost. Recovering from the earliest 
offset: $earliestOffset")
    +    if (offset >= latestOffset || earliestOffset >= untilOffset) {
    +      // [offset, untilOffset) and [earliestOffset, latestOffset) have no 
overlap,
    +      // either
    +      // --------------------------------------------------------
    +      //         ^                 ^         ^         ^
    +      //         |                 |         |         |
    +      //   earliestOffset   latestOffset   offset   untilOffset
    +      //
    +      // or
    +      // --------------------------------------------------------
    +      //      ^          ^              ^                ^
    +      //      |          |              |                |
    +      //   offset   untilOffset   earliestOffset   latestOffset
    +      val warningMessage =
    +        s"""
    +          |The current available offset range is [$earliestOffset, 
$latestOffset).
    +          | Offset ${offset} is out of range, and records in [$offset, 
$untilOffset) will be
    +          | skipped ${additionalMessage(failOnDataLoss = false)}
    +        """.stripMargin
    +      logWarning(warningMessage)
    +      UNKNOWN_OFFSET
    +    } else if (offset >= earliestOffset) {
    +      // 
-----------------------------------------------------------------------------
    +      //         ^            ^                  ^                         
        ^
    +      //         |            |                  |                         
        |
    +      //   earliestOffset   offset   min(untilOffset,latestOffset)   
max(untilOffset, latestOffset)
    +      //
    +      // This will happen when a topic is deleted and recreated, and new 
data are pushed very fast,
    +      // then we will see `offset` disappears first then appears again. 
Although the parameters
    +      // are same, the state in Kafka cluster is changed, so the outer 
loop won't be endless.
    +      logWarning(s"Found a disappeared offset $offset. " +
    +        s"Some data may be lost ${additionalMessage(failOnDataLoss = 
false)}")
    +      offset
    +    } else {
    +      // 
------------------------------------------------------------------------------
    +      //      ^           ^                       ^                        
         ^
    +      //      |           |                       |                        
         |
    +      //   offset   earliestOffset   min(untilOffset,latestOffset)   
max(untilOffset, latestOffset)
    +      val warningMessage =
    +        s"""
    +           |The current available offset range is [$earliestOffset, 
$latestOffset).
    +           | Offset ${offset} is out of range, and records in [$offset, 
$earliestOffset) will be
    +           | skipped ${additionalMessage(failOnDataLoss = false)}
    +        """.stripMargin
    +      logWarning(warningMessage)
    +      earliestOffset
    +    }
    +  }
     
    -    if (record.offset != offset) {
    -      logInfo(s"Buffer miss for $groupId $topicPartition $offset")
    +  /**
    +   * Get the record at `offset`.
    +   *
    +   * @throws OffsetOutOfRangeException if `offset` is out of range
    +   * @throws TimeoutException if cannot fetch the record in 
`pollTimeoutMs` milliseconds.
    +   */
    +  private def fetchData(
    +      offset: Long,
    +      pollTimeoutMs: Long): ConsumerRecord[Array[Byte], Array[Byte]] = {
    +    if (offset != nextOffsetInFetchedData || !fetchedData.hasNext()) {
    +      // This is the first fetch, or the last pre-fetched data has been 
drained.
    +      // Seek to the offset because we may call seekToBeginning or 
seekToEnd before this.
           seek(offset)
           poll(pollTimeoutMs)
    -      assert(fetchedData.hasNext(),
    -        s"Failed to get records for $groupId $topicPartition $offset " +
    -          s"after polling for $pollTimeoutMs")
    -      record = fetchedData.next()
    +    }
    +
    +    if (!fetchedData.hasNext()) {
    +      // We cannot fetch anything after `poll`. Two possible cases:
    +      // - `offset` is out of range so that Kafka returns nothing. Just 
throw
    +      // `OffsetOutOfRangeException` to let the caller handle it.
    +      // - Cannot fetch any data before timeout. TimeoutException will be 
thrown.
    +      val (earliestOffset, latestOffset) = getAvailableOffsetRange()
    +      if (offset < earliestOffset || offset >= latestOffset) {
    --- End diff --
    
    @koeninger Just updated the timeout logic. It will check the current 
available offset range and use it to distinguish these two cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to