Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15820#discussion_r89018842
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
    @@ -34,53 +38,211 @@ import org.apache.spark.internal.Logging
     private[kafka010] case class CachedKafkaConsumer private(
         topicPartition: TopicPartition,
         kafkaParams: ju.Map[String, Object]) extends Logging {
    +  import CachedKafkaConsumer._
     
       private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
     
    -  private val consumer = {
    +  private var consumer = createConsumer
    +
    +  /** Iterator to the already fetch data */
    +  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
    +  private var nextOffsetInFetchedData = UNKNOWN_OFFSET
    +
    +  /** Create a KafkaConsumer to fetch records for `topicPartition` */
    +  private def createConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
         val c = new KafkaConsumer[Array[Byte], Array[Byte]](kafkaParams)
         val tps = new ju.ArrayList[TopicPartition]()
         tps.add(topicPartition)
         c.assign(tps)
         c
       }
     
    -  /** Iterator to the already fetch data */
    -  private var fetchedData = 
ju.Collections.emptyIterator[ConsumerRecord[Array[Byte], Array[Byte]]]
    -  private var nextOffsetInFetchedData = -2L
    -
       /**
    -   * Get the record for the given offset, waiting up to timeout ms if IO 
is necessary.
    -   * Sequential forward access will use buffers, but random access will be 
horribly inefficient.
    +   * Get the record for the given offset if available. Otherwise it will 
either throw error
    +   * (if failOnDataLoss = true), or return the next available offset 
within [offset, untilOffset).
    +   *
    +   * @param offset the offset to fetch.
    +   * @param untilOffset the max offset to fetch. Exclusive.
    +   * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka.
    +   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
    +   *                       offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
    +   *                       this method will either return record at offset 
if available, or return
    +   *                       the next earliest available record less than 
untilOffset, or null. It
    +   *                       will not throw any exception.
        */
    -  def get(offset: Long, pollTimeoutMs: Long): ConsumerRecord[Array[Byte], 
Array[Byte]] = {
    +  def get(
    --- End diff --
    
    Can you also document that it can return null. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to