Github user gaborgsomogyi commented on a diff in the pull request:
https://github.com/apache/spark/pull/22138#discussion_r214803861
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
}
}
- /** Create a new consumer and reset cached states */
- private def resetConsumer(): Unit = {
- consumer.close()
- consumer = createConsumer
- fetchedData.reset()
+ /**
+ * Poll messages from Kafka starting from `offset` and update
`fetchedData`. `fetchedData` may be
+ * empty if the Kafka consumer fetches some messages but all of them are
not visible messages
+ * (either transaction messages, or aborted messages when
`isolation.level` is `read_committed`).
+ *
+ * @throws OffsetOutOfRangeException if `offset` is out of range.
+ * @throws TimeoutException if the consumer position is not changed
after polling. It means the
+ * consumer polls nothing before timeout.
+ */
+ private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+ val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+ fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+ }
+
+ private def ensureConsumerAvailable(): Unit = {
+ if (consumer == null) {
--- End diff --
Why not using option?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]