Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22042#discussion_r210985375
  
    --- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
    @@ -346,11 +385,40 @@ private[kafka010] case class InternalKafkaConsumer(
         consumer.seek(topicPartition, offset)
       }
     
    -  private def poll(pollTimeoutMs: Long): Unit = {
    +  /**
    +   * Poll messages from Kafka starting from `offset` and set `fetchedData` 
and `offsetAfterPoll`.
    +   * `fetchedData` may be empty if the Kafka fetches some messages but all 
of them are not visible
    +   * messages (either transaction messages, or aborted messages when 
`isolation.level` is
    +   * `read_committed`).
    +   *
    +   * @throws OffsetOutOfRangeException if `offset` is out of range.
    +   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
    +   *                          consumer polls nothing before timeout.
    +   */
    +  private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
    +    seek(offset)
         val p = consumer.poll(pollTimeoutMs)
         val r = p.records(topicPartition)
         logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
    -    fetchedData = r.iterator
    +    offsetAfterPoll = consumer.position(topicPartition)
    --- End diff --
    
    I strongly think that this should not be a var, rather a clear return 
value. we have been burnt by too many mutable vars/defs (see all the flakiness 
caused by the structured ProgressReporter) and we should consciously try to 
improve this everywhere by not having vars all over the place.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to