Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22042#discussion_r210985375
--- Diff:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
---
@@ -346,11 +385,40 @@ private[kafka010] case class InternalKafkaConsumer(
consumer.seek(topicPartition, offset)
}
- private def poll(pollTimeoutMs: Long): Unit = {
+ /**
+ * Poll messages from Kafka starting from `offset` and set `fetchedData`
and `offsetAfterPoll`.
+ * `fetchedData` may be empty if the Kafka fetches some messages but all
of them are not visible
+ * messages (either transaction messages, or aborted messages when
`isolation.level` is
+ * `read_committed`).
+ *
+ * @throws OffsetOutOfRangeException if `offset` is out of range.
+ * @throws TimeoutException if the consumer position is not changed
after polling. It means the
+ * consumer polls nothing before timeout.
+ */
+ private def poll(offset: Long, pollTimeoutMs: Long): Unit = {
+ seek(offset)
val p = consumer.poll(pollTimeoutMs)
val r = p.records(topicPartition)
logDebug(s"Polled $groupId ${p.partitions()} ${r.size}")
- fetchedData = r.iterator
+ offsetAfterPoll = consumer.position(topicPartition)
--- End diff --
I strongly think that this should not be a var, rather a clear return
value. we have been burnt by too many mutable vars/defs (see all the flakiness
caused by the structured ProgressReporter) and we should consciously try to
improve this everywhere by not having vars all over the place.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]