Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r211795985
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -80,6 +90,72 @@ private[kafka010] case class InternalKafkaConsumer(
kafkaParams: ju.Map[String, Object]) extends Logging {
import InternalKafkaConsumer._
+ /**
+ * The internal object to store the fetched data from Kafka consumer and
the next offset to poll.
+ *
+ * @param records the pre-fetched Kafka records.
+ * @param nextOffsetInFetchedData the next offset in `records`. We use
this to verify if we should
+ * check if the pre-fetched data is still
valid.
+ * @param offsetAfterPoll the Kafka offset after calling `poll`. We will
use this offset to poll
+ * when `records` is drained.
+ */
+ private case class FetchedData(
+ private var records: ju.ListIterator[ConsumerRecord[Array[Byte],
Array[Byte]]],
+ var nextOffsetInFetchedData: Long,
--- End diff --
Make this public getter, private setter.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]