Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/22042#discussion_r210985375 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -346,11 +385,40 @@ private[kafka010] case class InternalKafkaConsumer( consumer.seek(topicPartition, offset) } - private def poll(pollTimeoutMs: Long): Unit = { + /** + * Poll messages from Kafka starting from `offset` and set `fetchedData` and `offsetAfterPoll`. + * `fetchedData` may be empty if the Kafka fetches some messages but all of them are not visible + * messages (either transaction messages, or aborted messages when `isolation.level` is + * `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def poll(offset: Long, pollTimeoutMs: Long): Unit = { + seek(offset) val p = consumer.poll(pollTimeoutMs) val r = p.records(topicPartition) logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") - fetchedData = r.iterator + offsetAfterPoll = consumer.position(topicPartition) --- End diff -- I strongly think that this should not be a var, rather a clear return value. we have been burnt by too many mutable vars/defs (see all the flakiness caused by the structured ProgressReporter) and we should consciously try to improve this everywhere by not having vars all over the place.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org