HeartSaVioR commented on a change in pull request #25135: [SPARK-28367][SS] Use
new KafkaConsumer.poll API in Kafka connector
URL: https://github.com/apache/spark/pull/25135#discussion_r303804331
##########
File path:
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
##########
@@ -419,6 +416,19 @@ private[kafka010] class KafkaOffsetReader(
stopConsumer()
_consumer = null // will automatically get reinitialized again
}
+
+ private def getPartitions(): ju.Set[TopicPartition] = {
+ var partitions = Set.empty[TopicPartition].asJava
+ val startTimeMs = System.currentTimeMillis()
+ while (partitions.isEmpty && System.currentTimeMillis() - startTimeMs <
pollTimeoutMs) {
+ // Poll to get the latest assigned partitions
+ consumer.poll(jt.Duration.ofMillis(100))
Review comment:
Please correct me if I'm missing here. (Not an expert of Kafka, may miss
some details.)
It may return after 100ms if there's no record to consume even metadata is
ready. Here we only need metadata but once we call poll, the request is bound
to the records.
To be clear, we would like to call `poll(0)` with explicitly putting sleep
(to avoid coupling with records), but it would be also OK to let
`consumer.poll` wait instead. Explicit sleep may sleep more accurately if
there's a case record is ready to poll but metadata is not ready (I'm not sure
this can be possible).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]